概要
チャネル関数の続き。
今回は、複数のチャネルのシーケンスを一つにまとめて順に消費していくチャネルを作る関数について。
例えば、3つのファイルを読み込む処理書いていて、それぞれのファイルを読む処理をゴルーチンにしてたとします。
んで、結果はそれぞれチャネルで受け取るとすると、3つのチャネルを持っていることになります。
結果は一つのリストなりファイルなりに纏めるとして
for v := range ch1 { } for v := range ch2 { } for v := range ch3 { }
ってやるか、そもそもスライスで持っておいて
for _, ch := range chList { for v := range ch { } }
でもいいんですが、ちょっと冗長かなと。以下のようにしたい。
for v := range chans.Concat(done, chList...) { }
名前はConcatにしときました。書籍では Bridge になってましたが、入力が <-chan <-chan interface{}
になっていたので、...<-chan interface{}
を扱う関数として別に用意。内部で Bridge 呼んでいるだけですが。
サンプル
package chans // Concat -- 指定されたチャネルのシーケンスを順に消費していく単一のチャネルを返します。 func Concat(done <-chan struct{}, chList ...<-chan interface{}) <-chan interface{} { if len(chList) == 0 { c := make(chan interface{}) close(c) return c } chSeq := make(chan (<-chan interface{}), len(chList)) func() { defer close(chSeq) for _, c := range chList { chSeq <- c } }() return Bridge(done, chSeq) } // Bridge -- 指定されたチャネルのシーケンスを順に消費していく単一のチャネルを返します。 func Bridge(done <-chan struct{}, chanCh <-chan <-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) for { var ch <-chan interface{} select { case c, ok := <-chanCh: if !ok { return } ch = c case <-done: return } for v := range OrDone(done, ch) { select { case out <- v: case <-done: } } } }() return out }
gomy/concat.go at master · devlights/gomy · GitHub
以下、テストコードです。
package chans import ( "reflect" "testing" ) func TestConcat(t *testing.T) { type ( testin struct { data [][]interface{} } testout struct { result []interface{} } testcase struct { in testin out testout } ) cases := []testcase{ { in: testin{data: [][]interface{}{ {1, 2, 3}, {4, 5, 6}, }}, out: testout{result: []interface{}{ 1, 2, 3, 4, 5, 6, }}, }, } for _, c := range cases { func() { done := make(chan struct{}) defer close(done) chList := make([]<-chan interface{}, 0, len(c.in.data)) for _, list := range c.in.data { func() { ch := make(chan interface{}, len(list)) defer close(ch) for _, v := range list { ch <- v } chList = append(chList, ch) }() } concatCh := Concat(done, chList...) results := make([]interface{}, 0, 0) for v := range concatCh { t.Log(v) results = append(results, v) } // concat の場合は、fanIn と異なり取得順序は確定なので中身も一致していることをテストする t.Logf("[c.out.result] %v", c.out.result) t.Logf("[results ] %v", results) if !reflect.DeepEqual(c.out.result, results) { t.Errorf("want: %v\tgot: %v", c.out.result, results) } }() } }
実行すると以下のようになります。
$ go test -v github.com/devlights/gomy/chans -run ^TestConcat.*$ === RUN TestConcat TestConcat: concat_test.go:57: 1 TestConcat: concat_test.go:57: 2 TestConcat: concat_test.go:57: 3 TestConcat: concat_test.go:57: 4 TestConcat: concat_test.go:57: 5 TestConcat: concat_test.go:57: 6 TestConcat: concat_test.go:62: [c.out.result] [1 2 3 4 5 6] TestConcat: concat_test.go:63: [results ] [1 2 3 4 5 6] --- PASS: TestConcat (0.00s) PASS
ついでに、context を利用している場合のサンプル。
package concat import ( "context" "fmt" "math/rand" "time" "github.com/devlights/gomy/chans" "github.com/devlights/gomy/output" ) // MultiChannelConcat -- chans.Concat() を利用して処理するサンプルです。 func MultiChannelConcat() error { var ( numGoroutine = 5 // 並行起動するデータ取得ゴルーチンの数 takeCount = 2 // データ取得チャネル毎にいくつデータを取得するかの数 dataInInterval = 100 * time.Millisecond // データ投入時のインターバル ) // コンテキストを生成 var ( rootCtx = context.Background() mainCtx, cancel = context.WithCancel(rootCtx) ) // 入力元となるチャネルと各ゴルーチンの終了判定チャネル var ( srcCh = make(chan interface{}) doneChList = make([]<-chan struct{}, 0, 0) takeChList = make([]<-chan interface{}, 0, numGoroutine) ) defer close(srcCh) // srcCh に データを投入していくゴルーチン起動 doneChList = append(doneChList, gen(mainCtx.Done(), srcCh, dataInInterval)) // srcCh から takeCount個 取り出すチャネルを複数生成 for i := 0; i < numGoroutine; i++ { takeDoneCh, takeCh := take(mainCtx.Done(), srcCh, i+1, takeCount) doneChList = append(doneChList, takeDoneCh) takeChList = append(takeChList, takeCh) } // 複数の取得チャネルを纏めてしまって、出力 (出力順序は守る) for v := range chans.Concat(mainCtx.Done(), takeChList...) { output.Stdoutl("[main]", v) } cancel() <-chans.WhenAll(doneChList...) return nil } func take(done <-chan struct{}, in <-chan interface{}, index int, takeCount int) (<-chan struct{}, <-chan interface{}) { terminated := make(chan struct{}) out := make(chan interface{}) go func() { // defer output.Stdoutf("take-goroutine", "%02d END\n", index) defer close(terminated) defer close(out) for v := range chans.Take(done, in, takeCount) { out <- fmt.Sprintf("[take-%02d] %v", index, v) } }() return terminated, out } func gen(done <-chan struct{}, in chan<- interface{}, interval time.Duration) <-chan struct{} { out := make(chan struct{}) go func() { // defer output.Stdoutl("gen-goroutine", "END") defer close(out) randInt := func() interface{} { return rand.Int() } for v := range chans.RepeatFn(done, randInt) { in <- v <-time.After(interval) } }() return out }
try-golang/multi_channel_concat.go at master · devlights/try-golang · GitHub
実行すると以下のようになります。
$ make run ENTER EXAMPLE NAME: async_multi_channel_concat [Name] "async_multi_channel_concat" [main] [take-01] 4692119960690034160 [main] [take-01] 8604966331458351554 [main] [take-02] 7875191509257954031 [main] [take-02] 4998976565280053056 [main] [take-03] 4053247634135861360 [main] [take-03] 5272630624697359627 [main] [take-04] 2769253797494110094 [main] [take-04] 3775252429711134731 [main] [take-05] 3856557697572725505 [main] [take-05] 1322177934211094863 [Elapsed] 1.0050107s
先に作ったチャネルから順にデータを取得していってますね。
参考
- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場