いろいろ備忘録日記

主に .NET とか Java とか Python絡みのメモを公開しています。最近Go言語勉強中。

System.Threading.Channelsのメモ-01 (基本的な使い方)

概要

最近、Goでよく遊んでいるのでGoのチャネルみたいなものが

C#で無いかなーって探してみたら、以下がありました。

www.nuget.org

そのものスバリの名前のライブラリがあるじゃないですか。知らなかったです。

で、.NET Blog の方にも紹介記事がありました。

devblogs.microsoft.com

上の記事とかまだちゃんと読めていないのですが、後で読む。

また、以下のようなめっちゃ丁寧に書いてくださっているシリーズ記事もありました。

deniskyashif.com

deniskyashif.com

deniskyashif.com

これも後できっちり読む。

とりあえず、今回は基本的な使い方の勉強です。

インストール

インストールは NuGet で インストールすれば終わりでした。

プレリリースで、5.0.0 preview が出ているみたいですが、最新安定版は v4.7.0

依存しているライブラリは

  • System.Threading.Tasks.Extensions >= 4.5.2

.Net Core 3.1 だとこうなってました。.Net Framework の方だと依存ライブラリが異なるかもしれません。

サンプル

使い方忘れないようにコメント盛ったので、ちょっと長いですが

基本Goのチャネルと同じような感じで使えて便利ですね。

個人的には 標準で搭載されている BlockingCollection<T> よりこっちの方が好きです。

#define LOGGING_PRODUCER
#undef LOGGING_PRODUCER

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using TryCSharp.Common;

namespace TryCSharp.Samples.Async.Channels
{
    /// <summary>
    /// System.Threading.Channels の基本的な使い方についてのサンプルです。
    /// </summary>
    [Sample]
    public class ChannelBasicReadWrite : IAsyncExecutable
    {
        public async Task Execute()
        {
            // ---------------------------------------------------
            // System.Threading.Channels
            // --------------------------
            // System.Threading.Channels は、標準ライブラリとしては
            // 搭載されていないライブラリ。インストールにはNuGetを利用する。
            //
            // 2020-04-19 時点での最新安定版は v4.7.0
            // prerelease として、v5.0.0 が出ている。
            //
            // System.Threading.Channels を利用すると
            // Goの チャネル のような使い勝手で非同期データ処理が行える。
            // 標準ライブラリ側にも BlockingCollection<T> が存在するが
            // それよりも FIFO に特化しているなど処理がやりやすい。
            //
            // Goのチャネルと同様に、一つのチャネルから読み取り用と書き込み用の
            // チャネルを取得できる。
            //
            // [参考]
            // - https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/
            // - https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.1
            // - https://deniskyashif.com/2019/12/08/csharp-channels-part-1/
            // - https://deniskyashif.com/2019/12/11/csharp-channels-part-2/
            // - https://deniskyashif.com/2020/01/07/csharp-channels-part-3/
            // ---------------------------------------------------

            //
            // (1) チャネルを生成
            //     チャネルには Bounded(容量制限あり)とUnbounded(容量制限なし)の2つがある
            //
            // 今回サンプルということで、以下の構成で使ってみる
            //   - Producer/Consumerパターンでデータをやり取りするチャネル
            //   - 出力用のチャネル
            // ProducerとConsumerは1:3という構成とする。
            //
            var dataCh = Channel.CreateBounded<int>(10);
            var logCh = Channel.CreateUnbounded<string>();

            //
            // (2) Consumer を生成
            //
            const int numConsumers = 3;
            var consumers = this.RunConsumers(numConsumers, dataCh.Reader, logCh.Writer);

            //
            // (3) Producer を生成
            //
            var interval = TimeSpan.FromMilliseconds(10);
            var producer = this.RunProducer(interval, dataCh.Writer, logCh.Writer);

            //
            // (4) 出力用タスク生成
            //
            var printer = this.RunPrinter(logCh.Reader);

            //
            // (5) チャネルをクローズさせる役割のタスク生成 (Goでいうdoneチャネル)
            //
            var done = Task.Run(async () =>
            {
                await Task.Yield();
                await Task.Delay(TimeSpan.FromMilliseconds(200));

                // チャネルのクローズ
                //   - Writer.Complete() を呼ぶことでチャネルにもう書き込まないと伝える
                //   - チャネルを利用している側では WaitToXXXAsync() が false を返すのでループを抜けることになる
                dataCh.Writer.Complete();
                logCh.Writer.TryWrite("dataCh closed");

                logCh.Writer.Complete();
                Console.WriteLine("logCh closed");
            });

            //
            // (6) 待ち合わせ
            //
            await done;
            await Task.WhenAll(consumers.Concat(new[] {producer, printer}));

            Console.WriteLine("...DONE...");
        }

        private IEnumerable<Task> RunConsumers(int numConsumers, ChannelReader<int> inCh, ChannelWriter<string> logCh)
        {
            var tasks = new List<Task>();
            for (var i = 0; i < numConsumers; i++)
            {
                var index = i;
                tasks.Add(Task.Run(async () =>
                {
                    // チャネル からデータを読み取る際の基本形
                    //   - WaitToReadAsync() で読み取りできるか確認
                    //   - TryRead() で値を読み取り
                    while (await inCh.WaitToReadAsync())
                    {
                        while (inCh.TryRead(out var v))
                        {
                            if (await logCh.WaitToWriteAsync())
                            {
                                logCh.TryWrite($"[consumer{index + 1}] {v}");
                            }
                        }
                    }
                }));
            }

            return tasks.ToArray();
        }

        private Task RunProducer(TimeSpan interval, ChannelWriter<int> outCh, ChannelWriter<string> logCh)
        {
            // チャネル にデータを書き込む際の基本形
            //   - WaitToWriteAsync() で書き込みできるか確認
            //   - TryWrite() で値を書き込み
            return Task.Run(async () =>
            {
                var count = 0;
                while (await outCh.WaitToWriteAsync())
                {
                    if (!outCh.TryWrite(count))
                    {
                        continue;
                    }
                    
                    if (await logCh.WaitToWriteAsync())
                    {
#if LOGGING_PRODUCER
                        logCh.TryWrite($"[producer] {count}");
#endif
                    }
                    
                    count++;
                    await Task.Delay(interval);
                }

                Console.WriteLine($"[producer] total {count - 1} items");
            });
        }

        private Task RunPrinter(ChannelReader<string> inCh)
        {
            return Task.Run(async () =>
            {
                while (await inCh.WaitToReadAsync())
                {
                    while (inCh.TryRead(out var v))
                    {
                        Console.WriteLine(v);
                    }
                }
            });
        }
    }
}

try-csharp/ChannelBasicReadWrite.cs at master · devlights/try-csharp · GitHub

実行すると以下のようになります。

$ cd try-csharp
$ make run
dotnet clean --nologo -v q
dotnet run --project TryCSharp.Tools.Cui/TryCSharp.Tools.Cui.csproj --onetime

ENTER CLASS NAME: ChannelBasicReadWrite
================== START ==================
[Async] **** BEGIN ****
[consumer2] 0
[consumer1] 1
[consumer2] 2
[consumer1] 3
[consumer2] 4
[consumer1] 5
[consumer2] 6
[consumer1] 7
[consumer3] 8
[consumer3] 9
[consumer2] 10
[consumer3] 11
[consumer2] 12
[consumer1] 13
[consumer1] 14
[consumer3] 15
[consumer1] 16
[consumer2] 17
dataCh closed
logCh closed
[producer] total 17 items
...DONE...
[Async] ****  END  ****
==================  END  ==================
Elapsed Time: 00:00:00.2291859

過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com