概要
引き続き、小ネタチャネル関数の続き。( #関連記事 参照)
以前にファンインする関数について記事書きました。
ついでなので、ファンアウトさせる関数も書いてみました。
サンプル
package chans // FanOut -- 指定されたチャネルの処理を指定されたワーカーの数でファンアウトします。 // // チャネルからデータを取得するたびに引数 callback が呼ばれます。 func FanOut(done <-chan struct{}, in <-chan interface{}, workerCount int, callback func(interface{})) []<-chan struct{} { outChList := make([]<-chan struct{}, 0, 0) for i := 0; i < workerCount; i++ { out := make(chan struct{}) go func() { defer close(out) for v := range OrDone(done, in) { select { case <-done: return default: } callback(v) } }() outChList = append(outChList, out) } return outChList }
gomy/fanout.go at master · devlights/gomy · GitHub
ファンアウトは一つの入力を複数のワーカーが処理するパターンなので関数の内部で複数のゴルーチンを起動します。
なので、戻り値は <-chan struct{}
のスライスにしてあります。
一気に待ち合わせしたい場合は
<-WhenAll(FanOut(...))
ってやれば出来ますね。
以下テストコードです。
package chans import ( "testing" "time" ) func TestFanOut(t *testing.T) { type ( testin struct { workerCount int input []interface{} interval time.Duration } testout struct { estimation time.Duration } testcase struct { in testin out testout } ) cases := []testcase{ { in: testin{ workerCount: 1, input: []interface{}{ 1, 2, 3, 4, 5, 6, }, interval: 100 * time.Millisecond, }, out: testout{ estimation: ((6/1 + 1) * 100) * time.Millisecond, }, }, { in: testin{ workerCount: 2, input: []interface{}{ 1, 2, 3, 4, 5, 6, }, interval: 100 * time.Millisecond, }, out: testout{ estimation: ((6/2 + 1) * 100) * time.Millisecond, }, }, { in: testin{ workerCount: 3, input: []interface{}{ 1, 2, 3, 4, 5, 6, }, interval: 100 * time.Millisecond, }, out: testout{ estimation: ((6/3 + 1) * 100) * time.Millisecond, }, }, } for caseIndex, c := range cases { func(index int) { done := make(chan struct{}) defer close(done) start := time.Now() <-WhenAll(FanOut( done, ForEach(done, c.in.input...), c.in.workerCount, func(v interface{}) { <-time.After(c.in.interval) })...) elapsed := time.Since(start) t.Logf("[workerCount=%d][estimation] %v\t[elapsed] %v", c.in.workerCount, c.out.estimation, elapsed) if c.out.estimation < elapsed { t.Errorf("want: <= %v\tgot: %v", c.out.estimation, elapsed) } }(caseIndex + 1) } }
実行すると以下のようになります。
$ go test -v github.com/devlights/gomy/chans -run ^TestFanOut.*$ === RUN TestFanOut TestFanOut: fanout_test.go:78: [workerCount=1][estimation] 700ms [elapsed] 601.3349ms TestFanOut: fanout_test.go:78: [workerCount=2][estimation] 400ms [elapsed] 302.4562ms TestFanOut: fanout_test.go:78: [workerCount=3][estimation] 300ms [elapsed] 200.512ms --- PASS: TestFanOut (1.10s) PASS ok github.com/devlights/gomy/chans 1.183s
同じデータに対してワーカー数を増やしていくと、処理時間が変化していっていますね。
ただ、GoにおいてゴルーチンはOSのスレッドそのものではなくて、GoのランタイムがOSのスレッドに多重化してうまくスケジューリングしてくれるものなので、必ずしもワーカー数をどんどん増やしたからといって良くなるわけではありません。ユーザはゴルーチンのスケジューリングについては手出しができないので、最適な同時実行数はGoのランタイム任せになります。
参考

- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場