いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

Goメモ-74 (複数のOrDoneチャネルを使って処理, OrDone, FanOut, Workers)

概要

devlights.hatenablog.com

の補足サンプル。

上の記事では、一つのチャネルをOrDone() でラップして処理していましたが

複数の場合のサンプルもついでに追加。

サンプル

package ordone

import (
    "context"
    "fmt"
    "runtime"
    "time"

    "github.com/devlights/gomy/chans"
    "github.com/devlights/gomy/enumerable"
    "github.com/devlights/gomy/output"
)

// MultiInput -- chans.OrDone() を利用して処理するサンプルです。(入力チャネルが複数の場合)
func MultiInput() error {
    // 型のエイリアス
    type (
        DoneCh <-chan struct{}
        InCh   chan<- interface{}
        OutCh  <-chan interface{}
    )

    // 並行起動するゴルーチンの数とタイムリミット
    var (
        timeLimit    = 1 * time.Second
        numGoroutine = runtime.NumCPU()
    )

    // コンテキストとチャネル関連
    var (
        rootCtx    = context.Background()
        mainCtx, _ = context.WithTimeout(rootCtx, timeLimit)
        srcCh      = make(chan interface{})
        doneChList = make([]<-chan struct{}, 0, numGoroutine)
    )

    defer close(srcCh)

    // -------------------------------------------------------
    // [Send] 100ms 毎に srcCh にデータを送り続けるゴルーチン起動
    // -------------------------------------------------------
    doneChList = append(doneChList, func(done DoneCh, in InCh) DoneCh {
        var (
            out = make(chan struct{})
        )

        go func(done DoneCh, in InCh) {
            defer close(out)

            // 値を返すクロージャ生成
            fn := func() func() interface{} {
                r := enumerable.NewRange(0, 100)
                return func() interface{} {
                    r.Next()
                    return r.Current()
                }
            }()

            for v := range chans.RepeatFn(done, fn) {
                in <- v
                <-time.After(100 * time.Millisecond)
            }
        }(done, in)

        return out
    }(mainCtx.Done(), srcCh))

    // -------------------------------------------------------
    // [Recv] srcCh からデータを取得するゴルーチンを複数起動
    // -------------------------------------------------------
    for i := 0; i < numGoroutine; i++ {
        doneChList = append(doneChList, func(done DoneCh, ch OutCh, index int) DoneCh {
            var (
                out  = make(chan struct{})
                name = fmt.Sprintf("[goroutine-%02d]", index)
            )

            go func(done DoneCh, dataCh OutCh, name string) {
                defer close(out)

                for v := range chans.OrDone(done, dataCh) {
                    output.Stdoutl(name, v)
                }
            }(done, ch, name)

            return out
        }(mainCtx.Done(), srcCh, i))
    }

    // -------------------------------------------------------
    // [Join] メインコンテキストが完了するまで待機
    // -------------------------------------------------------
    <-chans.WhenAll(doneChList...)

    return nil
}

try-golang/ordone_multiinput.go at master · devlights/try-golang · GitHub

実行すると以下のようになります。

$ make run
ENTER EXAMPLE NAME: async_ordone_multi_input
[Name] "async_ordone_multi_input"
[goroutine-00]       1
[goroutine-00]       2
[goroutine-01]       3
[goroutine-03]       4
[goroutine-02]       5
[goroutine-00]       6
[goroutine-01]       7
[goroutine-03]       8
[goroutine-02]       9
[goroutine-00]       10


[Elapsed] 1.0045703s

みんなバラバラで動いているけど、メインのコンテキストが終了状態になったらピタッと完了してくれていますね。

一つのデータソースに対して、並行して複数の処理が動くので、この状態を ファン・アウト(Fan-Out) パターンといったり Workers パターンといったりしますね。

参考

Go言語による並行処理

Go言語による並行処理

関連記事

devlights.hatenablog.com


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com