今回も、Barrierクラスについて。
Barrierクラスは、.NET 4.0からSystem.Threading名前空間に追加されたクラスです。
前回の記事は以下から参照できます。
- .NET クラスライブラリ探訪-047 (System.Threading.Barrier(1))(バリア, SignalAndWait, フェーズ, 協調動作, .NET 4.0)
BarrierクラスのSignalAndWaitメソッドには、CountdownEventクラスと同様にキャンセルトークンを
受け付けるオーバーロードが存在します。キャンセル処理のやり方は、CountdownEventクラスの場合と
同じです。
- .NET クラスライブラリ探訪-045 (System.Threading.CountdownEvent(3))(キャンセルトークンの利用, CancellationToken, Wait, .NET 4.0)
以下、サンプルです。
以下のサンプルでは、5つの処理を別スレッドで走らせています。
各スレッドは、特定のフェーズで一旦集結し、途中結果を表示してから次のフェーズに進みますが
5秒経過した時点で、処理をキャンセルするようにしています。
#region BarrierSamples-02 /// <summary> /// Barrierクラスについてのサンプルです。 /// </summary> /// <remarks> /// Barrierクラスは、.NET 4.0から追加されたクラスです。 /// </remarks> public class BarrierSamples02 : IExecutable { // 計算値を保持する変数 long _count; // キャンセルトークンソース. CancellationTokenSource _tokenSource; // キャンセルトークン. CancellationToken _token; public void Execute() { _tokenSource = new CancellationTokenSource(); _token = _tokenSource.Token; // // 5つの処理を、特定のフェーズ毎に同期させながら実行. // さらに、フェーズ単位で途中結果を出力するようにするが // 5秒経過した時点でキャンセルを行う。 // using (Barrier barrier = new Barrier(5, PostPhaseProc)) { try { Parallel.Invoke( () => ParallelProc(barrier, 10, 123456, 2), () => ParallelProc(barrier, 20, 678910, 3), () => ParallelProc(barrier, 30, 749827, 5), () => ParallelProc(barrier, 40, 847202, 7), () => ParallelProc(barrier, 50, 503295, 777), () => { // // 5秒間待機した後にキャンセルを行う. // Thread.Sleep(TimeSpan.FromSeconds(5)); Console.WriteLine("■■■■ キャンセル ■■■■"); _tokenSource.Cancel(); } ); } catch (AggregateException aggEx) { aggEx.Handle(HandleAggregateException); } } _tokenSource.Dispose(); Console.WriteLine("最終値:{0}", _count); } // // 各並列処理用のアクション. // void ParallelProc(Barrier barrier, int randomMaxValue, int randomSeed, int modValue) { // // 第一フェーズ. // Calculate(barrier, randomMaxValue, randomSeed, modValue, 100); // // 第二フェーズ. // Calculate(barrier, randomMaxValue, randomSeed, modValue, 5000); // // 第三フェーズ. // Calculate(barrier, randomMaxValue, randomSeed, modValue, 10000); } // // 計算処理. // void Calculate(Barrier barrier, int randomMaxValue, int randomSeed, int modValue, int loopCountMaxValue) { Random rnd = new Random(randomSeed); Stopwatch watch = Stopwatch.StartNew(); int loopCount = rnd.Next(loopCountMaxValue); Console.WriteLine("[Phase{0}] ループカウント:{1}, TASK:{2}", barrier.CurrentPhaseNumber, loopCount, Task.CurrentId); for (int i = 0; i < loopCount; i++) { // // キャンセル状態をチェック. // 別の場所にてキャンセルが行われている場合は // OperationCanceledExceptionが発生する. // _token.ThrowIfCancellationRequested(); // 適度に時間がかかるように調整. if (rnd.Next(10000) % modValue == 0) { Thread.Sleep(TimeSpan.FromMilliseconds(10)); } Interlocked.Add(ref _count, (i + rnd.Next(randomMaxValue))); } watch.Stop(); Console.WriteLine("[Phase{0}] SignalAndWait -- TASK:{1}, ELAPSED:{2}", barrier.CurrentPhaseNumber, Task.CurrentId, watch.Elapsed); try { // // シグナルを発行し、仲間のスレッドが揃うのを待つ. // barrier.SignalAndWait(_token); } catch (BarrierPostPhaseException postPhaseEx) { // // Post Phaseアクションにてエラーが発生した場合はここに来る. // (本来であれば、キャンセルするなどのエラー処理が必要) // Console.WriteLine("*** {0} ***", postPhaseEx.Message); throw; } catch (OperationCanceledException cancelEx) { // // 別の場所にてキャンセルが行われた. // // 既に処理が完了してSignalAndWaitを呼び、仲間のスレッドを // 待っている状態でキャンセルが発生した場合は // 「操作が取り消されました。」となる。 // // SignalAndWaitを呼ぶ前に、既にキャンセル状態となっている状態で // SignalAndWaitを呼ぶと // 「操作がキャンセルされました。」となる。 // Console.WriteLine("キャンセルされました -- MESSAGE:{0}, TASK:{1}", cancelEx.Message, Task.CurrentId); throw; } } // // Barrierにて、各フェーズ毎が完了した際に呼ばれるコールバック. // (Barrierクラスのコンストラクタにて設定する) // void PostPhaseProc(Barrier barrier) { // // Post Phaseアクションは、同時実行している処理が全てSignalAndWaitを // 呼ばなければ発生しない。 // // つまり、この処理が走っている間、他の同時実行処理は全てブロックされている状態となる。 // long current = Interlocked.Read(ref _count); Console.WriteLine("現在のフェーズ:{0}, 参加要素数:{1}", barrier.CurrentPhaseNumber, barrier.ParticipantCount); Console.WriteLine("\t現在値:{0}", current); // // 以下のコメントを外すと、次のPost Phaseアクションにて // 全てのSignalAndWaitを呼び出している、処理にてBarrierPostPhaseExceptionが // 発生する。 // //throw new InvalidOperationException("dummy"); } // // AggregateException.Handleメソッドに設定されるコールバック. // bool HandleAggregateException(Exception ex) { if (ex is OperationCanceledException) { OperationCanceledException cancelEx = ex as OperationCanceledException; if (_token == cancelEx.CancellationToken) { Console.WriteLine("***Barrier内の処理がキャンセルされた MESSAGE={0} ***", cancelEx.Message); return true; } } return false; } } #endregion
実行結果は以下のようになります。
[Phase0] ループカウント:26, TASK:1 [Phase0] ループカウント:49, TASK:2 [Phase0] SignalAndWait -- TASK:1, ELAPSED:00:00:00.1410492 [Phase0] SignalAndWait -- TASK:2, ELAPSED:00:00:00.2189958 [Phase0] ループカウント:33, TASK:3 [Phase0] SignalAndWait -- TASK:3, ELAPSED:00:00:00.0452631 [Phase0] ループカウント:49, TASK:4 [Phase0] SignalAndWait -- TASK:4, ELAPSED:00:00:00.1085940 [Phase0] ループカウント:77, TASK:5 [Phase0] SignalAndWait -- TASK:5, ELAPSED:00:00:00.0000483 現在のフェーズ:0, 参加要素数:5 現在値:10076 [Phase1] ループカウント:2481, TASK:2 [Phase1] ループカウント:1661, TASK:3 [Phase1] ループカウント:1329, TASK:1 [Phase1] ループカウント:2486, TASK:4 [Phase1] ループカウント:3873, TASK:5 [Phase1] SignalAndWait -- TASK:5, ELAPSED:00:00:00.0878508 [Phase1] SignalAndWait -- TASK:3, ELAPSED:00:00:04.4091327 ■■■■ キャンセル ■■■■ キャンセルされました -- MESSAGE:操作は取り消されました。, TASK:3 キャンセルされました -- MESSAGE:操作は取り消されました。, TASK:5 ***Barrier内の処理がキャンセルされた MESSAGE=操作は取り消されました。 *** ***Barrier内の処理がキャンセルされた MESSAGE=操作はキャンセルされました。 *** ***Barrier内の処理がキャンセルされた MESSAGE=操作は取り消されました。 *** ***Barrier内の処理がキャンセルされた MESSAGE=操作はキャンセルされました。 *** ***Barrier内の処理がキャンセルされた MESSAGE=操作はキャンセルされました。 *** 最終値:12697206
2フェーズ目(Phase1)の途中にて処理がキャンセルされ、全ての処理がキャンセルされているのが分かります。
キャンセル前にSignalAndWaitで待機状態に入っていたスレッドの場合は
操作は取り消されました。
となり、
処理中にキャンセルされたスレッドの場合は
操作はキャンセルされました。
と表示されます。
================================
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場