概要
チャネル関数の続き。
今回は、Fan-In (ファンイン) パターンで複数のチャネルのシーケンスを一つにまとめて消費していくチャネルを作る関数について。
非同期系の本とか呼んでいると、よくファン・イン(fan-in)
とか ファン・アウト(fan-out)
とかよく聞きます。
ファンアウトは、どっちかというとWorkersパターンって呼ばれる方が多いですかね。
本来は、回路設計とかの話で出てくる用語なのですが、ソフトウェアの開発で非同期周りでよく話題に上がります。
身構える必要は全然なくて
- ファンアウト (fan-out) は、一つのキューに対してNのワーカーがデータを処理する
- ファンイン (fan-in)は、その逆でNのキューのデータを一つに纏める
って感じで覚えておけばいいです。どちらも用語は知らなくてもいつの間にかそんな処理書いてたってことが多いですね。
Goにおいて、キューってのはチャネルを表すことが出来るので、今回のファンインは
Nのチャネルのデータを一つのチャネルに集約する
って感じになります。
前回のConcatと何が違うの?ってなりますが、Concatの方は、指定されたNのチャネルを前から順に使っていってデータを吸い出していきます。今回のファンインさんは、Nのチャネルを指定されるところは同じですが、前から順に行くのではなく、データの吸い出し部分も非同期させます。順序を問わないってことですね。
Goの公式ブログでも記載されています。
Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that's closed when all the inputs are closed. This is called fan-in.
サンプル
package chans // FanIn -- 指定されたチャネルリストをファンインするチャネルを返します。 func FanIn(done <-chan struct{}, channels ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) chList := make([]<-chan struct{}, 0, 0) for _, in := range channels { chList = append(chList, func() <-chan struct{} { terminated := make(chan struct{}) go func(in <-chan interface{}) { defer close(terminated) for v := range in { select { case <-done: return case out <- v: } } }(in) return terminated }()) } go func(chList []<-chan struct{}) { defer close(out) <-WhenAll(chList...) }(chList) return out }
gomy/fanin.go at master · devlights/gomy · GitHub
以下、テストコードです。
package chans import ( "testing" ) func TestFanIn(t *testing.T) { type ( testin struct { data [][]interface{} } testout struct { result []interface{} } testcase struct { in testin out testout } ) cases := []testcase{ { in: testin{data: [][]interface{}{ {1, 2, 3}, {4, 5, 6}, }}, out: testout{result: []interface{}{ 1, 2, 3, 4, 5, 6, }}, }, } for _, c := range cases { func() { done := make(chan struct{}) defer close(done) chList := make([]<-chan interface{}, 0, len(c.in.data)) for _, list := range c.in.data { func() { ch := make(chan interface{}, len(list)) defer close(ch) for _, v := range list { ch <- v } chList = append(chList, ch) }() } fanInCh := FanIn(done, chList...) results := make([]interface{}, 0, 0) for v := range fanInCh { t.Log(v) results = append(results, v) } // fan-in の場合は、flatten と異なり取得順序は不定となるので個数でテストする t.Logf("[c.out.result] %v", c.out.result) t.Logf("[results ] %v", results) if len(c.out.result) != len(results) { t.Errorf("want: %v\tgot: %v", c.out.result, results) } }() } }
実行すると以下のようになります。
$ go test -v github.com/devlights/gomy/chans -run ^TestFanIn.*$ === RUN TestFanIn TestFanIn: fanin_test.go:56: 1 TestFanIn: fanin_test.go:56: 4 TestFanIn: fanin_test.go:56: 5 TestFanIn: fanin_test.go:56: 6 TestFanIn: fanin_test.go:56: 2 TestFanIn: fanin_test.go:56: 3 TestFanIn: fanin_test.go:61: [c.out.result] [1 2 3 4 5 6] TestFanIn: fanin_test.go:62: [results ] [1 4 5 6 2 3] --- PASS: TestFanIn (0.00s) PASS
ついでに、context を利用している場合のサンプル。前回のConcatのサンプルとほぼ同じで、Concatって呼び出しがFanInに変わっているだけです。
package fanin import ( "context" "fmt" "math/rand" "time" "github.com/devlights/gomy/chans" "github.com/devlights/gomy/output" ) // MultiChannelFanIn -- chans.FanIn() を利用して処理するサンプルです。(入力チャネルが複数の場合) func MultiChannelFanIn() error { var ( numGoroutine = 5 // 並行起動するデータ取得ゴルーチンの数 takeCount = 2 // データ取得チャネル毎にいくつデータを取得するかの数 dataInInterval = 100 * time.Millisecond // データ投入時のインターバル ) // コンテキストを生成 var ( rootCtx = context.Background() mainCtx, cancel = context.WithCancel(rootCtx) ) // 入力元となるチャネルと各ゴルーチンの終了判定チャネル var ( srcCh = make(chan interface{}) doneChList = make([]<-chan struct{}, 0, 0) takeChList = make([]<-chan interface{}, 0, numGoroutine) ) defer close(srcCh) // srcCh に データを投入していくゴルーチン起動 doneChList = append(doneChList, gen(mainCtx.Done(), srcCh, dataInInterval)) // srcCh から takeCount個 取り出すチャネルを複数生成 for i := 0; i < numGoroutine; i++ { takeDoneCh, takeCh := take(mainCtx.Done(), srcCh, i+1, takeCount) doneChList = append(doneChList, takeDoneCh) takeChList = append(takeChList, takeCh) } // 複数の取得チャネルを纏めてしまって、出力 (出力順序は問わない) for v := range chans.FanIn(mainCtx.Done(), takeChList...) { output.Stdoutl("[main]", v) } cancel() <-chans.WhenAll(doneChList...) return nil } func take(done <-chan struct{}, in <-chan interface{}, index int, takeCount int) (<-chan struct{}, <-chan interface{}) { terminated := make(chan struct{}) out := make(chan interface{}) go func() { // defer output.Stdoutf("take-goroutine", "%02d END\n", index) defer close(terminated) defer close(out) for v := range chans.Take(done, in, takeCount) { out <- fmt.Sprintf("[take-%02d] %v", index, v) } }() return terminated, out } func gen(done <-chan struct{}, in chan<- interface{}, interval time.Duration) <-chan struct{} { out := make(chan struct{}) go func() { // defer output.Stdoutl("gen-goroutine", "END") defer close(out) randInt := func() interface{} { return rand.Int() } for v := range chans.RepeatFn(done, randInt) { in <- v <-time.After(interval) } }() return out }
try-golang/multi_channel_fanin.go at master · devlights/try-golang · GitHub
実行すると以下のようになります。
$ make run ENTER EXAMPLE NAME: async_multi_channel_fanin [Name] "async_multi_channel_fanin" [main] [take-02] 9147366453389239735 [main] [take-02] 7707720205445421596 [main] [take-01] 3353539379143321040 [main] [take-05] 9007367778894470328 [main] [take-03] 2305305389264664480 [main] [take-04] 2885577774071629796 [main] [take-01] 4156069089090199425 [main] [take-05] 6668285635745240934 [main] [take-03] 8444500350534269182 [main] [take-04] 2050223648781894161 [Elapsed] 1.0049724s
前回のConcatの場合は、出力結果が1から順に出力されていましたが、今回はバラバラになっていますね。
このサンプルでいうと、srcChという一つのデータソースから複数のゴルーチンがtakeしている部分が fan-out、出力するために chans.FanIn を使って一つにまとめている部分が fan-in ですね。
参考
- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場