いろいろ備忘録日記

主に .NET と Python絡みのメモを公開しています。

.NET クラスライブラリ探訪-047 (System.Threading.Barrier(1))(バリア, SignalAndWait, フェーズ, 協調動作, .NET 4.0)


今回は、Barrierクラスについて。
Barrierクラスは、.NET 4.0からSystem.Threading名前空間に追加されたクラスです。


Barrierクラスは、並行処理を複数のフェーズ毎に協調動作させる場合に利用します。
つまり、N個のスレッドを特定のフェーズ毎に足踏みを揃えて、次のフェーズに進むようにします。


Barrierでは、複数のスレッドが前進するためにバリアのところに全員が揃わないといけません。
一見、CountdownEventと同じように見えますが、以下の違いがあります。

CountdownEventは、特定のイベントが終わったという事を待つために利用するが、Barrierは仲間のスレッドを待つために利用する。


CountdownEventは、Signalを呼ぶことでカウンタをデクリメントします。
CountdownEvent.Waitを呼んでいるスレッドは、それぞれのワーカースレッドが自身の処理を終了して
カウンタを減らすまで待ちます。カウンタが0になった時点で待っていたスレッドが動きます。


Barrierは、SignalAndWaitを呼ぶことで、仲間のスレッドが揃うまで待ちます。
仲間のスレッドが、全員SignalAndWaitを呼んだ時点で、バリアが解除され、各スレッドは
次のフェーズに向かって進み出します。
Javaでいうと、CyclicBarrierクラスと同じ概念になります。


CountdownEventについては、前に記述していますのでよろしければご参考下さい。


Barrierクラスには、コンストラクタで各処理が揃った時点で呼ばれるコールバックを渡す事が出来ます。
これを利用すると、それぞれのフェーズ単位で結果を収集することが出来ます。
このコールバックが呼ばれている間は、ワーカースレッドはブロックされた状態となります。
つまり、コールバックが走っている間にワーカースレッドが先に進むことはありません。


以下、サンプルです。
以下のサンプルでは、5つの処理を別スレッドで走らせています。
各スレッドは、特定のフェーズで一旦集結し、途中結果を表示してから次のフェーズに進みます。
メインスレッドは、別処理が終了するのを待ち、最後に最終値を表示します。

    #region BarrierSamples-01
    /// <summary>
    /// Barrierクラスについてのサンプルです。
    /// </summary>
    /// <remarks>
    /// Barrierクラスは、.NET 4.0から追加されたクラスです。
    /// </remarks>
    public class BarrierSamples01 : IExecutable
    {
        // 計算値を保持する変数
        long _count;
        
        public void Execute()
        {
            //
            // Barrierクラスは、並行処理を複数のフェーズ毎に協調動作させる場合に利用する.
            // つまり、同時実行操作を同期する際に利用出来る。
            //
            // 例えば、論理的に3フェーズ存在する処理があったとして、並行して動作する処理が2つあるとする。
            // 各並行処理に対して、フェーズ毎に一旦結果を収集し、また平行して処理を行う事とする。
            // そのような場合に、Barrierクラスが役に立つ。
            //
            // Barrierクラスをインスタンス化する際に、対象となる並行処理の数をコンストラクタに指定する。
            // コンストラクタには、フェーズ毎に実行されるコールバックを設定することも出来る。
            //
            // 後は、Barrier.SignalAndWaitを、各並行処理が呼び出せば良い。
            // コンストラクタに指定した数分、SignalAndWaitが呼び出された時点で1フェーズ終了となり
            // 設定したコールバックが実行される。
            //
            // 各並行処理は、SignalAndWaitを呼び出した後、Barrierにて指定した処理数分のSignalAndWaitが
            // 呼び出されるまで、ブロックされる。
            //
            // 対象とする並行処理数は、以下のメソッドを利用することにより増減させることが出来る。
            //     ・AddParticipants
            //     ・RemoveParticipants
            //
            // CountdownEvent, ManualResetEventSlimと同じく、このクラスのSignalAndWaitメソッドも
            // CancellationTokenを受け付けるオーバーロードが存在する。
            //
            // CountdownEventと同じく、このクラスもIDisposableを実装しているのでusing可能。
            //
            
            //
            // 5つの処理を、特定のフェーズ毎に同期させながら実行.
            // さらに、フェーズ単位で途中結果を出力するようにする.
            //
            using (Barrier barrier = new Barrier(5, PostPhaseProc))
            {
                Parallel.Invoke(
                    () => ParallelProc(barrier, 10, 123456, 2), 
                    () => ParallelProc(barrier, 20, 678910, 3),
                    () => ParallelProc(barrier, 30, 749827, 5),
                    () => ParallelProc(barrier, 40, 847202, 7),
                    () => ParallelProc(barrier, 50, 503295, 777)
                );
            }
            
            Console.WriteLine("最終値:{0}", _count);
        }
        
        //
        // 各並列処理用のアクション.
        //
        void ParallelProc(Barrier barrier, int randomMaxValue, int randomSeed, int modValue)
        {
            //
            // 第一フェーズ.
            //
            Calculate(barrier, randomMaxValue, randomSeed, modValue, 100);
            
            //
            // 第二フェーズ.
            //
            Calculate(barrier, randomMaxValue, randomSeed, modValue, 5000);
            
            //
            // 第三フェーズ.
            //
            Calculate(barrier, randomMaxValue, randomSeed, modValue, 10000);
        }
        
        //
        // 計算処理.
        //
        void Calculate(Barrier barrier, int randomMaxValue, int randomSeed, int modValue, int loopCountMaxValue)
        {
            Random    rnd   = new Random(randomSeed);
            Stopwatch watch = Stopwatch.StartNew();
            
            int loopCount = rnd.Next(loopCountMaxValue);
            Console.WriteLine("[Phase{0}] ループカウント:{1}, TASK:{2}", barrier.CurrentPhaseNumber, loopCount, Task.CurrentId);
            
            for (int i = 0; i < loopCount; i++)
            {
                // 適度に時間がかかるように調整.
                if (rnd.Next(10000) % modValue == 0)
                {
                    Thread.Sleep(TimeSpan.FromMilliseconds(10));
                }
                
                Interlocked.Add(ref _count, (i + rnd.Next(randomMaxValue)));
            }
            
            watch.Stop();
            Console.WriteLine("[Phase{0}] SignalAndWait -- TASK:{1}, ELAPSED:{2}", barrier.CurrentPhaseNumber, Task.CurrentId, watch.Elapsed);
            
            try
            {
                //
                // シグナルを発行し、仲間のスレッドが揃うのを待つ.
                //
                barrier.SignalAndWait();
            }
            catch (BarrierPostPhaseException postPhaseEx)
            {
                //
                // Post Phaseアクションにてエラーが発生した場合はここに来る.
                // (本来であれば、キャンセルするなどのエラー処理が必要)
                //
                Console.WriteLine("*** {0} ***", postPhaseEx.Message);
                throw;
            }
        }
        
        //
        // Barrierにて、各フェーズ毎が完了した際に呼ばれるコールバック.
        // (Barrierクラスのコンストラクタにて設定する)
        //
        void PostPhaseProc(Barrier barrier)
        {
            //
            // Post Phaseアクションは、同時実行している処理が全てSignalAndWaitを
            // 呼ばなければ発生しない。
            //
            // つまり、この処理が走っている間、他の同時実行処理は全てブロックされている状態となる。
            //
            long current = Interlocked.Read(ref _count);
            
            Console.WriteLine("現在のフェーズ:{0}, 参加要素数:{1}", barrier.CurrentPhaseNumber, barrier.ParticipantCount);
            Console.WriteLine("\t現在値:{0}", current);
            
            //
            // 以下のコメントを外すと、次のPost Phaseアクションにて
            // 全てのSignalAndWaitを呼び出している、処理にてBarrierPostPhaseExceptionが
            // 発生する。
            //
            //throw new InvalidOperationException("dummy");
        }
    }
    #endregion


実行結果は以下のようになります。

  [Phase0] ループカウント:26, TASK:1
  [Phase0] ループカウント:49, TASK:2
  [Phase0] SignalAndWait -- TASK:1, ELAPSED:00:00:00.1441315
  [Phase0] SignalAndWait -- TASK:2, ELAPSED:00:00:00.2235196
  [Phase0] ループカウント:33, TASK:3
  [Phase0] SignalAndWait -- TASK:3, ELAPSED:00:00:00.0463386
  [Phase0] ループカウント:49, TASK:4
  [Phase0] SignalAndWait -- TASK:4, ELAPSED:00:00:00.1085861
  [Phase0] ループカウント:77, TASK:5
  [Phase0] SignalAndWait -- TASK:5, ELAPSED:00:00:00.0000469
  現在のフェーズ:0, 参加要素数:5
  	現在値:10076
  [Phase1] ループカウント:3873, TASK:5
  [Phase1] ループカウント:2481, TASK:2
  [Phase1] ループカウント:1661, TASK:3
  [Phase1] ループカウント:1329, TASK:1
  [Phase1] ループカウント:2486, TASK:4
  [Phase1] SignalAndWait -- TASK:5, ELAPSED:00:00:00.0874141
  [Phase1] SignalAndWait -- TASK:3, ELAPSED:00:00:04.4083702
  [Phase1] SignalAndWait -- TASK:4, ELAPSED:00:00:05.8278827
  [Phase1] SignalAndWait -- TASK:1, ELAPSED:00:00:10.3519437
  [Phase1] SignalAndWait -- TASK:2, ELAPSED:00:00:13.4408375
  現在のフェーズ:1, 参加要素数:5
  	現在値:16131285
  [Phase2] ループカウント:4962, TASK:2
  [Phase2] ループカウント:7747, TASK:5
  [Phase2] ループカウント:3323, TASK:3
  [Phase2] ループカウント:4972, TASK:4
  [Phase2] ループカウント:2658, TASK:1
  [Phase2] SignalAndWait -- TASK:5, ELAPSED:00:00:00.2007359
  [Phase2] SignalAndWait -- TASK:3, ELAPSED:00:00:09.7322769
  [Phase2] SignalAndWait -- TASK:4, ELAPSED:00:00:11.7606351
  [Phase2] SignalAndWait -- TASK:1, ELAPSED:00:00:20.5117366
  [Phase2] SignalAndWait -- TASK:2, ELAPSED:00:00:25.9414153
  現在のフェーズ:2, 参加要素数:5
  	現在値:80244208
  最終値:80244208


各フェーズで、一旦全処理が集結して、途中結果を表示し次のフェーズに移っているのが分かります。


================================
過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。