いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

.NET クラスライブラリ探訪-048 (System.Threading.Barrier(2))(バリア, キャンセル処理, CancellationToken, .NET 4.0)


今回も、Barrierクラスについて。
Barrierクラスは、.NET 4.0からSystem.Threading名前空間に追加されたクラスです。


前回の記事は以下から参照できます。


BarrierクラスのSignalAndWaitメソッドには、CountdownEventクラスと同様にキャンセルトークンを
受け付けるオーバーロードが存在します。キャンセル処理のやり方は、CountdownEventクラスの場合と
同じです。


以下、サンプルです。
以下のサンプルでは、5つの処理を別スレッドで走らせています。
各スレッドは、特定のフェーズで一旦集結し、途中結果を表示してから次のフェーズに進みますが
5秒経過した時点で、処理をキャンセルするようにしています。

    #region BarrierSamples-02
    /// <summary>
    /// Barrierクラスについてのサンプルです。
    /// </summary>
    /// <remarks>
    /// Barrierクラスは、.NET 4.0から追加されたクラスです。
    /// </remarks>
    public class BarrierSamples02 : IExecutable
    {
        // 計算値を保持する変数
        long _count;
        // キャンセルトークンソース.
        CancellationTokenSource _tokenSource;
        // キャンセルトークン.
        CancellationToken _token;
        
        public void Execute()
        {
            _tokenSource = new CancellationTokenSource();
            _token       = _tokenSource.Token;

            //
            // 5つの処理を、特定のフェーズ毎に同期させながら実行.
            // さらに、フェーズ単位で途中結果を出力するようにするが
            // 5秒経過した時点でキャンセルを行う。
            //
            using (Barrier barrier = new Barrier(5, PostPhaseProc))
            {
                
                try
                {
                    Parallel.Invoke(
                        () => ParallelProc(barrier, 10, 123456, 2), 
                        () => ParallelProc(barrier, 20, 678910, 3),
                        () => ParallelProc(barrier, 30, 749827, 5),
                        () => ParallelProc(barrier, 40, 847202, 7),
                        () => ParallelProc(barrier, 50, 503295, 777),
                        () => 
                        {
                            //
                            // 5秒間待機した後にキャンセルを行う.
                            //
                            Thread.Sleep(TimeSpan.FromSeconds(5));
                            Console.WriteLine("■■■■ キャンセル ■■■■");
                            _tokenSource.Cancel();
                        }
                    );
                }
                catch (AggregateException aggEx)
                {
                    aggEx.Handle(HandleAggregateException);
                }
            }
            
            _tokenSource.Dispose();
            
            Console.WriteLine("最終値:{0}", _count);
        }
        
        //
        // 各並列処理用のアクション.
        //
        void ParallelProc(Barrier barrier, int randomMaxValue, int randomSeed, int modValue)
        {
            //
            // 第一フェーズ.
            //
            Calculate(barrier, randomMaxValue, randomSeed, modValue, 100);
            
            //
            // 第二フェーズ.
            //
            Calculate(barrier, randomMaxValue, randomSeed, modValue, 5000);
            
            //
            // 第三フェーズ.
            //
            Calculate(barrier, randomMaxValue, randomSeed, modValue, 10000);
        }
        
        //
        // 計算処理.
        //
        void Calculate(Barrier barrier, int randomMaxValue, int randomSeed, int modValue, int loopCountMaxValue)
        {
            Random    rnd   = new Random(randomSeed);
            Stopwatch watch = Stopwatch.StartNew();
            
            int loopCount = rnd.Next(loopCountMaxValue);
            Console.WriteLine("[Phase{0}] ループカウント:{1}, TASK:{2}", barrier.CurrentPhaseNumber, loopCount, Task.CurrentId);
            
            for (int i = 0; i < loopCount; i++)
            {
                //
                // キャンセル状態をチェック.
                // 別の場所にてキャンセルが行われている場合は
                // OperationCanceledExceptionが発生する.
                //
                _token.ThrowIfCancellationRequested();
                
                // 適度に時間がかかるように調整.
                if (rnd.Next(10000) % modValue == 0)
                {
                    Thread.Sleep(TimeSpan.FromMilliseconds(10));
                }
                
                Interlocked.Add(ref _count, (i + rnd.Next(randomMaxValue)));
            }
            
            watch.Stop();
            Console.WriteLine("[Phase{0}] SignalAndWait -- TASK:{1}, ELAPSED:{2}", barrier.CurrentPhaseNumber, Task.CurrentId, watch.Elapsed);
            
            try
            {
                //
                // シグナルを発行し、仲間のスレッドが揃うのを待つ.
                //
                barrier.SignalAndWait(_token);
            }
            catch (BarrierPostPhaseException postPhaseEx)
            {
                //
                // Post Phaseアクションにてエラーが発生した場合はここに来る.
                // (本来であれば、キャンセルするなどのエラー処理が必要)
                //
                Console.WriteLine("*** {0} ***", postPhaseEx.Message);
                throw;
            }
            catch (OperationCanceledException cancelEx)
            {
                //
                // 別の場所にてキャンセルが行われた.
                //
                // 既に処理が完了してSignalAndWaitを呼び、仲間のスレッドを
                // 待っている状態でキャンセルが発生した場合は
                //      「操作が取り消されました。」となる。
                //
                // SignalAndWaitを呼ぶ前に、既にキャンセル状態となっている状態で
                // SignalAndWaitを呼ぶと
                //      「操作がキャンセルされました。」となる。
                //
                Console.WriteLine("キャンセルされました -- MESSAGE:{0}, TASK:{1}", cancelEx.Message, Task.CurrentId);
                throw;
            }
        }
        
        //
        // Barrierにて、各フェーズ毎が完了した際に呼ばれるコールバック.
        // (Barrierクラスのコンストラクタにて設定する)
        //
        void PostPhaseProc(Barrier barrier)
        {
            //
            // Post Phaseアクションは、同時実行している処理が全てSignalAndWaitを
            // 呼ばなければ発生しない。
            //
            // つまり、この処理が走っている間、他の同時実行処理は全てブロックされている状態となる。
            //
            long current = Interlocked.Read(ref _count);
            
            Console.WriteLine("現在のフェーズ:{0}, 参加要素数:{1}", barrier.CurrentPhaseNumber, barrier.ParticipantCount);
            Console.WriteLine("\t現在値:{0}", current);
            
            //
            // 以下のコメントを外すと、次のPost Phaseアクションにて
            // 全てのSignalAndWaitを呼び出している、処理にてBarrierPostPhaseExceptionが
            // 発生する。
            //
            //throw new InvalidOperationException("dummy");
        }
        
        //
        // AggregateException.Handleメソッドに設定されるコールバック.
        //
        bool HandleAggregateException(Exception ex)
        {
            if (ex is OperationCanceledException)
            {
                OperationCanceledException cancelEx = ex as OperationCanceledException;
                if (_token == cancelEx.CancellationToken)
                {
                    Console.WriteLine("***Barrier内の処理がキャンセルされた MESSAGE={0} ***", cancelEx.Message);
                    return true;
                }
            }
            
            return false;
        }
    }
    #endregion


実行結果は以下のようになります。

  [Phase0] ループカウント:26, TASK:1
  [Phase0] ループカウント:49, TASK:2
  [Phase0] SignalAndWait -- TASK:1, ELAPSED:00:00:00.1410492
  [Phase0] SignalAndWait -- TASK:2, ELAPSED:00:00:00.2189958
  [Phase0] ループカウント:33, TASK:3
  [Phase0] SignalAndWait -- TASK:3, ELAPSED:00:00:00.0452631
  [Phase0] ループカウント:49, TASK:4
  [Phase0] SignalAndWait -- TASK:4, ELAPSED:00:00:00.1085940
  [Phase0] ループカウント:77, TASK:5
  [Phase0] SignalAndWait -- TASK:5, ELAPSED:00:00:00.0000483
  現在のフェーズ:0, 参加要素数:5
  	現在値:10076
  [Phase1] ループカウント:2481, TASK:2
  [Phase1] ループカウント:1661, TASK:3
  [Phase1] ループカウント:1329, TASK:1
  [Phase1] ループカウント:2486, TASK:4
  [Phase1] ループカウント:3873, TASK:5
  [Phase1] SignalAndWait -- TASK:5, ELAPSED:00:00:00.0878508
  [Phase1] SignalAndWait -- TASK:3, ELAPSED:00:00:04.4091327
  ■■■■ キャンセル ■■■■
  キャンセルされました -- MESSAGE:操作は取り消されました。, TASK:3
  キャンセルされました -- MESSAGE:操作は取り消されました。, TASK:5
  ***Barrier内の処理がキャンセルされた MESSAGE=操作は取り消されました。 ***
  ***Barrier内の処理がキャンセルされた MESSAGE=操作はキャンセルされました。 ***
  ***Barrier内の処理がキャンセルされた MESSAGE=操作は取り消されました。 ***
  ***Barrier内の処理がキャンセルされた MESSAGE=操作はキャンセルされました。 ***
  ***Barrier内の処理がキャンセルされた MESSAGE=操作はキャンセルされました。 ***
  最終値:12697206


2フェーズ目(Phase1)の途中にて処理がキャンセルされ、全ての処理がキャンセルされているのが分かります。
キャンセル前にSignalAndWaitで待機状態に入っていたスレッドの場合は

操作は取り消されました。

となり、
処理中にキャンセルされたスレッドの場合は

操作はキャンセルされました。

と表示されます。


================================
過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。