概要
の補足サンプル。
上の記事では、一つのチャネルをOrDone()
でラップして処理していましたが
複数の場合のサンプルもついでに追加。
サンプル
package ordone import ( "context" "fmt" "runtime" "time" "github.com/devlights/gomy/chans" "github.com/devlights/gomy/enumerable" "github.com/devlights/gomy/output" ) // MultiInput -- chans.OrDone() を利用して処理するサンプルです。(入力チャネルが複数の場合) func MultiInput() error { // 型のエイリアス type ( DoneCh <-chan struct{} InCh chan<- interface{} OutCh <-chan interface{} ) // 並行起動するゴルーチンの数とタイムリミット var ( timeLimit = 1 * time.Second numGoroutine = runtime.NumCPU() ) // コンテキストとチャネル関連 var ( rootCtx = context.Background() mainCtx, _ = context.WithTimeout(rootCtx, timeLimit) srcCh = make(chan interface{}) doneChList = make([]<-chan struct{}, 0, numGoroutine) ) defer close(srcCh) // ------------------------------------------------------- // [Send] 100ms 毎に srcCh にデータを送り続けるゴルーチン起動 // ------------------------------------------------------- doneChList = append(doneChList, func(done DoneCh, in InCh) DoneCh { var ( out = make(chan struct{}) ) go func(done DoneCh, in InCh) { defer close(out) // 値を返すクロージャ生成 fn := func() func() interface{} { r := enumerable.NewRange(0, 100) return func() interface{} { r.Next() return r.Current() } }() for v := range chans.RepeatFn(done, fn) { in <- v <-time.After(100 * time.Millisecond) } }(done, in) return out }(mainCtx.Done(), srcCh)) // ------------------------------------------------------- // [Recv] srcCh からデータを取得するゴルーチンを複数起動 // ------------------------------------------------------- for i := 0; i < numGoroutine; i++ { doneChList = append(doneChList, func(done DoneCh, ch OutCh, index int) DoneCh { var ( out = make(chan struct{}) name = fmt.Sprintf("[goroutine-%02d]", index) ) go func(done DoneCh, dataCh OutCh, name string) { defer close(out) for v := range chans.OrDone(done, dataCh) { output.Stdoutl(name, v) } }(done, ch, name) return out }(mainCtx.Done(), srcCh, i)) } // ------------------------------------------------------- // [Join] メインコンテキストが完了するまで待機 // ------------------------------------------------------- <-chans.WhenAll(doneChList...) return nil }
try-golang/ordone_multiinput.go at master · devlights/try-golang · GitHub
実行すると以下のようになります。
$ make run ENTER EXAMPLE NAME: async_ordone_multi_input [Name] "async_ordone_multi_input" [goroutine-00] 1 [goroutine-00] 2 [goroutine-01] 3 [goroutine-03] 4 [goroutine-02] 5 [goroutine-00] 6 [goroutine-01] 7 [goroutine-03] 8 [goroutine-02] 9 [goroutine-00] 10 [Elapsed] 1.0045703s
みんなバラバラで動いているけど、メインのコンテキストが終了状態になったらピタッと完了してくれていますね。
一つのデータソースに対して、並行して複数の処理が動くので、この状態を ファン・アウト(Fan-Out) パターンといったり Workers パターンといったりしますね。
参考

- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場