.NET 4.0から追加された「タスク並列ライブラリ(以下TPLと記述)」についての
メモを書いていこうと思ってます。
忘れやすいので、自分のために殴り書きメモを以下に記述。(超見づらいです・・すみません)
この内容をまとめていろいろ記述していこうと思っているところ。
リソースとしては
- タスク並列ライブラリ(TPL)についてのリソース (Task Parallel Library, Parallel.For, Thread, ThreadPool, 非同期, 並列)
でピックアップした、各記事を元に勉強していこうと思ってます。
元の記事はどれも素晴らしいです。
以下、MSDNのメモ書き。
- タスク並列ライブラリとは、1つまたは複数のタスクを同時に実行することを可能とするライブラリ。
- データの並列化:Parallel.For, Parallel.ForEach
- データの並列化:Parallel.Invoke, Task
- タスクとは、非同期処理を表すもの。
- タスクを利用すると、以下の利点がある。
- システムリソースをより効率的に、且つ、スケーラブルに利用できる。
- スレッドよりも、より詳細な制御が行える。
- .NET 4.0では、以下の処理を記述する際にTPLを利用することが推奨されている。
- マルチスレッド処理
- 非同期及び並列コード
- タスクには、主に以下の機能がある。
- 待機
- キャンセル
- 継続
- 信頼性の高い例外処理
- 詳細なステータス
- カスタムのスケジュール設定
- 暗黙的なタスクの作成には、Parallel.Invokeを利用する。
- Parallel.Invoke*1;
- 上記のコードでは、2つの処理が同時に実行される。
- 内部でタスクが生成されて、実行されている。
- 以下の場合は、直接タスクを作成して処理を行う。
- タスクの実行をさらに制御する場合
- タスクから値を返す必要がある場合
- タスクの状態は、「Status」プロパティから取得出来る。
- 取得出来るのは、TaskStatus列挙型となる。
- タスクからの戻り値は、「Result」プロパティで取得出来る。
- Resultプロパティの型は、タスク生成時に決定する。
- このプロパティは、処理が完了していない場合にブロックする。
- 戻り値を使用しないタスクの場合は、「Task」クラスを利用する。
- 戻り値を使用するタスクの場合は、「Task
」クラスを利用する。 - タスクは、以下の2種類の方法で生成する。
- Task.FactoryからStartNewメソッドで、生成とともに開始。
- 直接タスクをNewして生成し、Startメソッドで開始。
- 「Wait」メソッド、「Result」プロパティは、特別なメンバー。
- 上記2つのメンバーは、「トリガーメンバー」と呼ばれる。
- 上記2つのメンバーを呼び出した際荷、タスク内部で例外が発生しているとスローされる。
- AggregateExceptionがスローされる。
- AggregateExceptionは、内部でタスク内部で発生した複数の例外を保持している場合がある。
- InnerExceptionsで取得出来る。
- なので、上記メソッドを利用する場合で、例外が発生する可能性がある場合はTry~Catchが必要。
- タスクには、一意のIDが割り振られる。
- 「Id」プロパティより取得出来る。
- タスクは、生成する際に生成オプションを指定することが出来る。
- TaskCreationOptions列挙型となる。
- None
- 無し。(デフォルト)
- PreferFairness
- 先にスケジューリングされたタスクから順に実行される。
- LongRunning
- 実行に時間がかかるタスクであることを示す。
- AttachedToParent
- 親のタスクに対して、親子関係を持つ。
- None
- 生成オプションは、ビット毎のORで指定することも出来る。
- TaskCreationOptions列挙型となる。
- タスクの継続を行う場合、以下の方法が存在する。
- ContinueWithメソッド
- 一つのタスクに対して、継続タスクを指定する。
- ContinueWhenAllメソッド
- 継続元タスクが複数存在する状態で、その全てが完了した場合に継続する。
- ContinueWhenAnyメソッド
- 継続元タスクが複数存在する状態で、そのどれかが完了した場合に継続する。
- ContinueWithメソッド
- タスクの待機を行う場合、移管方法が存在する。
- Waitメソッド
- レシーバーのタスクを待機させる。
- Task.WaitAllメソッド
- 指定された複数のタスクを待機させる。
- Task.WaitAnyメソッド
- 指定された複数のタスクの内、どれかが完了するまで待機する。
- Waitメソッド
- 複数の継続元タスクから継続タスクを作成する場合は、以下の方法が存在する。
- Task.Factory.ContinueWhenAllメソッド
- Task.Factory.ContinueWhenAnyメソッド
- System.Collections.Concurrent名前空間には、TPLで利用できる可惜なコレクションクラスが含まれる。
- この名前空間は、.NET 4.0から追加された。
- 同期プログラミングでは、非同期操作で完了時に2番目の操作を呼び出してデータを渡すのが一般的。
- 非同期処理では、2番目にコールバックを渡すパターンが多い。
- タスク並列ライブラリでは、継続タスクに同じ機能が用意されている。
- 継続タスクとは、前のタスクが完了したときに、続けて呼び出されるタスクのことを言う。
- 前のタスクは、「先行タスク」とも言う。
- 前のタスクは、「継続元タスク」とも言う。
- 継続タスクは、以下の機能を持つ。
- 継続元のデータを継続先に渡す。
- 継続を呼び出す場合、呼び出さない場合について正確な条件を指定できる。
- 継続が開始される前、または継続の実行中に継続を取り消すことが出来る。
- 同じ継続元から複数の継続タスクを呼び出すことが出来る。
- 複数の継続先の全てが完了したときに1つの継続タスクを呼び出すことが出来る。
- 複数の継続先のどれか一つでも完了したときに1つの継続タスクを呼び出すことが出来る。
- 任意の長さで連続して継続を実行することが出来る。
- 継続元によってスローされた例外を処理するために継続を使用することが出来る。
- 継続タスクの例 (ContinueWith)
// 継続元タスク Task<DayOfWeek> rootTask = new Task<DayOfWeek>(() => DateTime.Now.DayOfWeek); // 継続タスク Task<string> continuationTask = rootTask.ContinueWith((antecedent) => { return antecedent.Result.ToString(); }); // 継続元タスクを開始 rootTask.Start(); // 継続タスクの結果を表示. Console.WriteLine(continuationTask.Result);
- 複数の継続元タスクに対して、それが全て完了した際に継続するタスクを作成することも出来る。
- Task.Factory.ContinueWhenAllを利用する。
- デリゲートの引数に渡されるのは、以下のようになる。
- ContinueWith:先行タスク
- ContinueWhenAll:先行タスクの配列
- ContinueWhenAny:最も最初に完了したタスク
Task<int> t1 = new Task<int>(() => { return 1; }); Task<int> t2 = new Task<int>(() => { return 2; }); Task<int> t3 = new Task<int>(() => { return 3; }); Task<int>[] tasks = new Task<int>[]{ t1, t2, t3 }; // 複数の継続元タスクが全て完了した後に継続するタスクを作成. Task<int> continuationTask = Task<int>.Factory.ContinueWhenAll( tasks, // 引数には、継続元タスクの配列が設定される. (antecedents) => { return antecedents.Sum(i => i.Result); } ); // 継続元タスクを開始. tasks.ToList().ForEach(t => t.Start()); // 継続タスクの結果を表示. Console.WriteLine(continuationTask.Result);
- 継続タスクは、継続元タスクによってのみ開始することが出来る。
- 継続元タスク以外から開始しようとすると、InvalidOperationExceptionが発生する。
- 継続タスクを作成する際、ContinueWithメソッドにTaskContinuationOptionsを指定することで動作を制御出来る。
- 継続タスクを開始する条件を指定できる。
- 継続元が完了した際に実行される。
- 継続元が失敗した際に実行される。
- 継続タスクを開始する条件を指定できる。
- TaskContinuationOptionsには、以下の種別が存在する。
- None:無し(デフォルト)
- PreferFairness:先にスケジュールされたタスクから、順に実行されていく。
- LongRunning:実行に時間がかかるタスクであることを示す。
- AttachedToParent:子タスクであることを示す。
- NotOnRanToCompletion:継続元タスクが完了した場合は、実行されないようにする。
- NotOnFaulted:継続元タスクがしっ端した場合は、実行されないようにする。
- NotOnCaneled:継続元タスクがキャンセルされた場合は、実行されないようにする。
- OnlyOnRanToCompletion:継続元タスクが完了した場合のみ、実行される。
- OnlyOnFaulted:継続元タスクが失敗した場合のみ、実行される。
- OnlyOnCanceled:継続元タスクがキャンセルされた場合のみ、実行される。
- ExecuteSynchronously:非常に短時間で完了する継続タスクであることを示す。
- 継続タスクがキャンセル状態となるのは、以下の場合である。
- キャンセル要求の応答として、「OperationCanceledException」をスローした場合
- CancellationTokenが引数として継続タスクに渡された場合で、IsCancellationRequestedプロパティがTrueの場合
- 継続タスクのTaskContinuationOptionsの条件が満たされないために、継続が実行されない場合
- 継続元タスクがキャンセルされた場合に継続タスクを実行しないようにするには、「NotOnCanceled」を指定する。
- 複数のタスクを同時にキャンセルするには、同じトークンを渡すようにすれば良い。
- QUESTION:CancellationToken.Tokenが返すオブジェクトは同じモノが返ってくるのか?
- 継続は、継続元タスクとアタッチされた全ての子タスクが完了するまで開始されない。
- 継続は、デタッチされた子タスクの完了は待機しない。
- 継続元タスクと継続タスクの関係は、親子の関係とは異なる。
- 継続タスクからスローされた例外は継続元へは反映されない。
- 継続タスクによってスローされた例外は、他のタスクでの処理と同様に処理する。
Task<int> t = Task<int>.Factory.StartNew(() => { return 1; }); Task<int> c = t.ContinueWith((antecedent) => { throw new InvalidOperationException(""); }); try { t.Wait(); c.Wait(); } catch (AggregatedException aggEx) { foreach (Exception ex in aggEx.InnerExceptions) { Console.WriteLine(e.Message); } }
- 継続が子タスクで、AttachedToParentオプションを指定している場合は以下のようになる。
- アタッチされているその他の子タスクと同様、その例外は親によって呼び出し元に反映される。
- 入れ子のタスクは、本質的に親タスクから独立している。
- 子タスクは、親と密接に関係している。
- 親タスクは、子タスクの完了を待機する。
- 親タスクは、子タスクによってスローされた例外を反映する。
- 親タスクは、子タスクの状態に依存する。
- デタッチされた子タスクが例外をスローする場合、以下の対処が必要となる。
- 入れ子でないタスクの場合と同様の監視。
- 外側のタスク(親タスク)内で直接処理する必要がある。
- アタッチされた子タスクが例外をスローする場合、以下のようになる。
- 例外は自動的に親タスクに反映される。
- 子のタスクが開始される前に親のタスクがキャンセルされた場合、子のタスクは開始されない。
- 子または入れ子のタスクが開始された後、キャンセルされた場合は、以下の処置が必要となる。
- キャンセルロジックが適用されていない限り、完了まで実行される。
- デタッチされた子タスクが、親と同じトークンを用いてキャンセルされた場合は以下のようになる。
- 親は子を待機せず、例外も反映されない。
- アタッチされた子タスクが、親と同じトークンを用いてキャンセルされた場合は以下のようになる。
- TaskCanceledExceptionがAggregateException内の連結されたスレッドに反映される。
- つまり、親のAggregateExceptionに含まれる。
- TaskCanceledExceptionがAggregateException内の連結されたスレッドに反映される。
- タスクにて、キャンセルを行うには、ユーザコード側からCancellationTokenSource.Tokenを呼び出す。
- タスク内部では、渡されたTokenのIsCancellationRequestedの値を見ることでキャンセルされたか否かが判明する。
- キャンセルが要求されている場合に、例外を発生させるには、token.ThrowIfCancellationRequestedを利用する。
- 発生する冷害は、「OperationCancellationException」となる。
- タスクにて、OperationCancellationExceptionが発生した場合、以下のようになる。
- タスクは、例外のトークンと自身に関連づけられているトークンを比較する。
- 上記のトークンが一致する場合は、自身の状態をキャンセル状態に変更する。
- タスク内で実行中のユーザコードによってスローされた、ハンドルされていない例外は、上のスレッドに反映される。
- タスクがアタッチされた子タスクの親である場合
- 複数のタスクを待機している場合
- AggregateExceptionのInnerExceptionsプロパティの中には、さらにAggregateExceptionが含まれている事がある。
- Flattenメソッドを利用すると、例外の階層を平坦化することが出来る。
- タスクの状態が「Faulted」の場合、「Exception」プロパティには元となった。
- 中身は、AggregateExceptionとなる。
- TaskScheduler.UnobservedTaskExceptionイベントをハンドルすると、以下のように記述することが出来る。
- どの部分でも、ハンドルされていないタスクの例外をキャッチすることが出来る。
以下、CodeProjectの「Task Parallel Library」の記事のメモ書き。
- タスク並列ライブラリとは?
- タスクライブラリと並列ライブラリの集まりのこと。
- タスク:System.Threading.Tasks.Taskを中心としたライブラリ。
- 並列 :System.Threading.Tasks.Parallelを中心としたライブラリ。
- タスクライブラリと並列ライブラリの集まりのこと。
- タスクの作成方法
- TaskFactoryを利用した作成
- Task.Factory.StartNewメソッドを利用して作成
- この場合、オブジェクトを取得した時点でタスク処理は開始している。
- 引数にはActionまたはAction
を指定すると戻り値無し。Func を指定すると戻り値有りのタスクとなる。
- Task.Factory.StartNewメソッドを利用して作成
- TaskFactoryを利用した作成
Task t = Task.Factory.StartNew(() => ...); // Action Task t = Task.Factory.StartNew((obj) => ..., T); // Action<T> Task<T> t = Task<T>.Factory.StartNew(() => { return T }); // Func<T> Task<T> t = Task<T>.Factory.StartNew(obj) => { return T }, A); // Func<A, T>
-
-
- 直接Newして作成
- インスタンスを取得した後に、明示的にStartメソッドの呼び出しが必要。
- 実際のタスクの例
- 引数でInt32のデータを受け取り、戻り値としてList
を返すタスクなど。
- 引数でInt32のデータを受け取り、戻り値としてList
- 直接Newして作成
-
Task<List<string>> t = Task<List<string>>.Factory.StartNew(i => { return new List<string>(); }, 2000);
- タスクの待ち合わせ
- タスクオブジェクトのWaitメソッド
- スレッドの場合と同じく、Waitを呼んだタスクが終了するまで待つ。
- タスクオブジェクトのWaitメソッド
Task<List<string>> t = Task<List<string>>.Factory.StartNew(i => { return new List<string>(); }, 2000); t.Wait();
-
- WaitAllメソッドを利用して、指定したタスク全てが終了するまで待つ。
Task t1 = Task.Factory.StartNew(() => Console.WriteLine("Task 1")); Task t2 = Task.Factory.StartNew(() => Console.WriteLine("Task 2")); Task t3 = Task.Factory.StartNew(() => Console.WriteLine("Task 3")); Task.WaitAll(new Task[]{ t1, t2, t3 });
- 例外処理とトリガーメソッド
- タスク内部で発生した冷害は、内部でAggregateExceptionとして集約された管理される。
- タスクでは、以下のメソッド/プロパティを呼び出したタイミングで例外はスローされる。(トリガーメソッド/プロパティ)
- Wait
- Result
- 例外を補足するのかどうかは、ユーザにゆだねられている。
- タスクを開始させてWaitもResultも呼ばない場合は、最終的に例外はスローされない。
- タスク内部で発生してAggregateExceptionの状態で管理されたままとなる。
- 基本的に、戻り値を持つタスクの場合はResultを呼ばないと戻り値を取れないので問題ない。
- WaitもResultも呼ばない場合というのは、呼びっぱなしの処理をするタスクの場合。
- そのようなタスクの場合は、タスク内部でTry~Catchしておくべき。
- AggregateExceptionのInnerExceptionsプロパティからタスク内部で発生した例外が取得出来る。
- タスクのキャンセル処理
- タスクにキャンセル機能を付与するには、CancellationTokenを利用する。
- CancellationTokenはCancellationTokenSourceから取得する。
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
-
- タスクの作成時にて、トークンを渡すオーバーロードを使用する。
Task t = Task.Factory.StartNew(DoSomeWork, token);
-
- キャンセルを行う場合は、CancellationTokenSource.Cancelを呼び出す。
- 呼び出されると、トークンがキャンセルを認識する。
- CancellationTokenSource.Cancelを呼び出しただけでは、キャンセル処理は完了していない。
- 後は、token.IsCancellationRequestedかThrowIfCancellationRequestedを用いてタスク内部で処理をキャンセルさせる。
- タスク内部でキャンセル処理が発生すると、OperationCanceledExceptionが発生する。
- OperationCanceledExceptionを取得するには、トリガーメソッド(Wait Or Result)を呼び出す。
- タスク内部でキャンセル処理が発生すると、タスクのIsCanceledがTrueとなる。
- IsCanceledがTrueになるタイミングは、トリガーメソッドが呼び出されてからとなる。
- キャンセルを行う場合は、CancellationTokenSource.Cancelを呼び出す。
- UIスレッドとの同期
- タスクにはUIスレッドとの同期機能が予め備わっている。
- UIスレッドとの同期には、SynchronizationContextを利用する。
- ある非同期処理が完了した後に、UIスレッドからコントロールを操作するには以下のようにする。
CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; Task rootTask = Task.Factory.StartNew(() => { 非同期処理 }); Task continuationTask = rootTask.ContinueWith( (antecedent) => { UI更新処理 }, token, // 現在のUIに紐づくSynchronizationContextを取得. TaskScheduler.FromCurrentSynchronizationContext() );
-
- Windows Formsの場合は、「WinformsSynchronizationContext」となる。
- WPFの場合は、「WPFDispatcherSynchronizationContext」となる。
- タスクの継続
- タスクには、次々とタスクをチェインしていくための継続機能が備わっている。
- 単純な継続
- 継続を行いたいタスクに対して、ContinueWithで繋げる。
- それ以外の継続方法
- ContinueWhenAny
- N個のタスクの内、どれか一つでも完了したら継続する方法
- ContinueWhenAll
- N個のタスクの内、全てが完了したら継続する方法
- 先行するタスクが失敗した場合のみ継続する方法
- ContinueWithメソッドにて、TaskContinuationOptions.OnlyOnFaultedを指定する。
- 先行するタスクが完了した場合のみ継続する方法
- ContinueWithメソッドにて、TaskContinuationOptions.OnlyOnRanToCompletionを指定する。
- タスクは何発でも継続できる。
- 先行するタスクで例外が発生した場合、OnlyOnFaultedが指定されていない継続タスクは実行されない。
- ContinueWhenAny
- Parallelライブラリ
- データの並列処理を行う為のライブラリ。
- タスクの並列処理は、Taskライブラリが担当している。
- Parallel.ForとParallel.ForEachとParallel.Invokeが存在している。
- Parallel.Forは既存のforループの並列処理版。
- データの並列処理を行う為のライブラリ。
for (int i = 0; i < 100; i++) { Console.WriteLine(i); }
を並列処理化すると
Paralle.For(0, 100, i => Console.WriteLine(i));
となる。
-
- Paralle.ForEachは既存のforeachループの並列処理版。
foreach (var data in xxxList) { Console.WriteLine(data); }
を並列処理化すると
Paralle.ForEach(xxxList data => Console.WriteLine(data));
となる。
-
- BreakとStopの方法
- ParallelLoopStateを受け取るようにする。
- Stopのやり方
- BreakとStopの方法
ParallelLoopResult result = Parallel.For( 0, 10, // 2番目の引数でParallelLoopStateを受け取る。 (i, state) => { if (i > 8) { // 並列ループを停止. state.Stop(); } } );
-
-
- Breakのやり方
-
ParallelLoopResult result = Parallel.For( 0, 10, // 2番目の引数でParallelLoopStateを受け取る。 (i, state) => { if (i > 8) { // 並列ループを抜ける. state.Break(); } } );
-
-
- StopとBreakの違い
- Stopは強制終了みたいな意味合い。Breakは文字通りBreakとなる。
- Breakの場合のみ、ParallelLoopResult.LowestBreakIterationに値が設定される。
- StopとBreakの違い
- Parallel処理のキャンセル
- キャンセルを行うには、CancellationTokenを利用する。
- トークンをParallelOptionsに設定して、ループを開始する。
- 並列処理のパーテショニング
- Partitionerを利用することで、並列処理をパーテショニングすることが出来る。
- 特定のデータ単位でチャンクを構成することが出来る。
- Thread Local Storage
- 並列ループ内にてローカルな変数を持ちたい場合に利用する。
- 並列処理では、各ループ毎に別々のスレッドが処理を実行している可能性があるので、変数の利用には特に注意する必要がある。
- 以下、トータルを求めるサンプル
- 3番目の引数にて、ローカル変数の初期化処理を記述する。
- 4番目の引数にて、ループ処理の本体を記述する。
- 5番目の引数にて、設定したローカル変数の後処理を記述する。
-
// ロックオブジェクト object lockObj = new object(); // 合計 int total = 0; // 並列処理 Parallel.For( 0, 100, // ローカル変数の初期化処理 () => 0, // ループ処理本体 (index, loopState, localValue) => { localValue += index; return localValue; }, // ローカル処理の最終処理 // 各ループ毎にfinallyのように呼ばれる. localValue => { lock (lockObj) { total += localValue; } } );
-
- CancellationTokenを利用しない場合は、CancellationToken.Noneを指定する。
- PLINQとは、LINQを並列処理させるための機能。
- 通常のLINQクエリをPLINQ対応にするための方法はすごく簡単。
- from句にてソースシーケンスを指定する際に、AsParallel()を指定する。
- これだけで、そのLINQクエリが並列処理されるようになる。
- from句にてソースシーケンスを指定する際に、AsParallel()を指定する。
例:元のLINQ
var query = from x in numbers select Math.Pow(x, 2);
だとする。上記のクエリを並列処理化するためには
var query = from x in numbers.AsParallel() select Math.Pow(x, 2);
とすれば良い。
- AsParallelメソッドは、ParallelQuery
を返す。 - AsParallelメソッドは、IEnumerable
の拡張メソッド。
- AsParallelメソッドは、IEnumerable
- PLINQでは、内部でTPL(タスク並列ライブラリ)が並列処理を行っている。
- 実際にクエリを並列処理するか否かは、TPLが決定する。
- なので、AsParallel()を指定して並列処理をするように指定しても並列処理化されない場合がある。
- TPLが並列処理するより同期処理の方が速いと判断した場合など。
- なので、AsParallel()を指定して並列処理をするように指定しても並列処理化されない場合がある。
- PLINQには、強制的に並列処理モードを設定するためのモード指定が出来る。
- WithExecutionModeメソッド
- ParallelExecutionMode.ForceParallelismを指定することで強制的に並列処理モードに出来る。
- WithExecutionModeメソッド
例:
var query = from x in numbers.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism) select Math.Pow(x, 2);
- PLINQでは、並列処理を行い処理を高速化するという名目上、デフォルトでは結果の順序は保持されない。
- LINQ To Objectでは、デフォルトで順序は保持される。
- PLINQにて、結果の順序を保持するには、「AsOrdered」メソッドを利用する。
例:
var query = from x in numbers.AsParallel().AsOrdered() select Math.Pow(x, 2);
- PLINQでは、並列処理を行う場合に内部で利用されるタスクの最大数を指定することが出来る。
- WithDegreeOfParallelismメソッド
- 引数にはタスクの最大数を指定する。マシンのCPU数に合わせておくと最適化できる。
- WithDegreeOfParallelismメソッド
例:
var query = from x in numbers.AsParallel().WithDegreeOfParallelism(2) select Math.Pow(x, 2);
- PLINQでは、結果をバッファリングするか否かを指定することが出来る。
- WithMergeOptionsメソッド
- デフォルトは、「AutoBuffered」となっている。ほとんどの場合はこの設定で良い。
- バッファリングをOFFにするには、「ParallelMergeOptions.NotBuffered」を指定する。
- 結果を逐一表示する場合などに指定する。
- 並び替えが必要な操作(orderbyなど)を指定している場合、このオプションは無視される。
- 並び替えが指定されている場合は、バッファリングが必須となるため。
- WithMergeOptionsメソッド
例:
var query = from x in numbers.AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered) select Math.Pow(x, 2); foreach (var item in query) { Console.WriteLine(item); }
例:
CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; var query = from x in numbers.AsParallel().WithCancellation(token) select Math.Pow(x, 2); tokenSource.Cancel(); try { foreach (var item in query) { Console.WriteLine(item); } } catch (OperationCanceledException cancelEx) { // キャンセルされた場合にここに入る。 } catch (AggregateException aggEx) { // 並列クエリ処理内にて例外が発生した場合にここに入る。 }
-
- PLINQのキャンセル処理には、以下の特殊な点があるので注意すること。
- 並列処理中に発生した例外がキャンセル処理のみの場合は、AggregateExceptionではなくOperationCanceledExceptionが発生する。
- 並列処理中にキャンセル処理も含めて複数の例外が発生した場合はAggregateExceptionが発生する。
- PLINQのキャンセル処理には、以下の特殊な点があるので注意すること。
- PLINQでのエラーハンドリングは、TPLの場合と同じやり方となる。
例:
var query = from x in numbers.AsParallel() select { if (x > 100) { throw new InvalidOperationException("error"); } else { return x; } }; try { foreach (var item in query) { Console.WriteLine(item); } } catch (AggregateException aggEx) { foreach (Exception innerEx in aggEx.Flatten().InnerExceptions) { Console.WriteLine(innerEx.Message); } }
- PLINQにて処理を行う場合、元のソースシーケンスを指定する際以下の注意が必要。
- IEnumerableもしくはEnumerableを指定した場合、PLINQ側では元データの要素数が判別できないためデータを特定のチャンクに分けて並列処理することが出来なくなる。つまり並列処理を行う事が出来ない。なので、この場合、データは同期的に上から順に処理されていく。
- ちゃんと並列処理を行う場合は、ToListやToArrayを用いて、具体的なコンテナを作成しておく必要がある。
例:以下の例では並列処理が行われない。(Enumerable.Rangeの戻り値がIEnumerable
var query = Enumerable.Range(1, 10).AsParallel().WithExecutionMode(ParalellExecutionMode.ForceParallelism) select Math.Pow(x, 2);
例:以下の例では並列処理が行われる。
var query = Enumerable.Range(1, 10).ToArray().AsParallel().WithExecutionMode(ParalellExecutionMode.ForceParallelism) select Math.Pow(x, 2);
- BlockingCollection
は、.NET 4.0より追加されたスレッドセーフなコレクションクラスである。 - 以下の機能を持つ。
- Producer-Consumerパターンを実装している。
- 生産者-消費者パターン。
- IProducerConsumerPatternインターフェースを実装している。
- Producer-Consumerパターンを実装している。
- 複数のスレッドからスレッドセーフに要素を追加及び削除できる。
- 要素が存在しない場合に、別スレッドが要素を削除しようとしたらブロックさせることが出来る。
- 要素が満杯の場合に、別スレッドが要素を挿入しようとしたらブロックさせることが出来る。
- 要素の挿入(Insert)及び削除(Delete)操作にたいして、TryXXX系のメソッドが用意されている。
- タイムアウトを設定出来る。
- キャンセルトークンを設定出来る。
- foreachする際に2種類のEnumerationが利用できる。
- 読み取り専用状態でループ。
- ループ中に要素を変更すると例外が発生するモード。
- 通常のコレクションのループと同じ。
- ループ中に要素を変更すると例外が発生するモード。
- ループ中にコレクションの変更を許容するモード。
- ループ中に要素を変更しても例外が発生しないモード。
- 読み取り専用状態でループ。
- .NET 4.0で追加されたスレッドセーフコレクションクラス達
- BlockingCollection
- ConcurrentBag
- 順序を持たない、重複を許容するコレクション
- Setは順序を持たず、重複を許容しないコレクション。
- 順序を持たない、重複を許容するコレクション
- ConcurrentDictionary
- Dictionaryのスレッドセーフ版。
- ConcurrentQueue
- Queueのスレッドセーフ版
- ConcurrentStack
- Stackのスレッドセーフ版
- BlockingCollection
- BlockingCollection
のサンプル - Producer-Consumerパターン。
- 以下の機能を持つ。
[生産者]側
void ProduceData(BlockingCollection<int> output) { try { for (int i = 0; i < 100; i++) { output.Add(i); } } finally { // データの生産が終わったことを通知. output.CompleteAdding(); } }
[消費者]側
void ConsumeData(BlockingCollection<int> input, BlockingCollection<int> output) { try { // GetConsumingEnumerableメソッドから消費者用のEnumerationが取得出来る。 // 生産者側が、生産が終わったことを通知してくるまでブロックされる。 foreach (int value in input.GetConsumingEnumerable()) { output.Add(value * value); } } finally { // データの生産が終わったことを通知. output.CompleteAdding(); } }
[全体]
void Main() { BlockingCollection<int> buffer1 = new BlockingCollection<int>(100); BlockingCollection<int> buffer2 = new BlockingCollection<int>(100); // 生産者と消費者を並列で処理. Parallel.Invoke( () => ProduceData(buffer1), () => ConsumeData(buffer1, buffer2) ); // 消費者が消費した結果をメインにて最後に出力. foreach (int value in buffer2.GetConsumingEnumerable()) { Console.WriteLine(value); } }
================================
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
*1:) => DoSomeWork(), () => DoSomeWork2(