概要
引き続き、小ネタチャネル関数の続き。( #関連記事 参照)
pythonに enumerate という関数があります。
この関数は、指定されたシーケンスを取り出すと同時にインデックスを付与して返してくれます。
こんな感じ。
$ python Python 3.8.0 (tags/v3.8.0:fa919fd, Oct 14 2019, 19:21:23) [MSC v.1916 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> l = ['hello', 'world'] >>> for i,v in enumerate(l): ... print(i,v) ... 0 hello 1 world
Goのチャネルを for でループすると、値が取得できるのですが
そのときにインデックスもついでにあったらいいなーってときがあります。
てことで、サンプル作ってみました。
サンプル
package chans type ( // IterValue -- chans.Enumerate() にて利用されるデータ型です。 IterValue struct { Index int // インデックス Value interface{} // 値 } ) func newIterValue(i int, v interface{}) *IterValue { return &IterValue{ Index: i, Value: v, } } // Enumerate -- 指定された入力チャネルの要素に対してインデックスを付与したデータを返すチャネルを生成します。 // // 戻り値のチャネルから取得できるデータ型は、*chans.IterValue となっています。 // // for e := range chans.Enumerate(done, inCh) { // if v, ok := e.(*IterValue); ok { // // v.Index でインデックス、 v.Value で値が取得できる // } // } // func Enumerate(done <-chan struct{}, in <-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) index := 0 ChLoop: for { select { case <-done: break ChLoop case v, ok := <-in: if !ok { break ChLoop } select { case out <- newIterValue(index, v): index++ case <-done: } } } }() return out }
gomy/enumerate.go at master · devlights/gomy · GitHub
以下テストコードです。
package chans import ( "testing" ) func TestEnumerate(t *testing.T) { type ( resultValue struct { index int value interface{} } testin struct { input []interface{} } testout struct { output []resultValue } testcase struct { in testin out testout } ) cases := []testcase{ { in: testin{input: []interface{}{"hello", "world"}}, out: testout{output: []resultValue{ { index: 0, value: "hello", }, { index: 1, value: "world", }, }}, }, } for caseIndex, c := range cases { func() { done := make(chan struct{}) defer close(done) inCh := make(chan interface{}) go func() { defer close(inCh) for _, v := range c.in.input { inCh <- v } }() for e := range Enumerate(done, inCh) { if v, ok := e.(*IterValue); ok { t.Logf("[test-%02d] [%v][%v]", caseIndex, v.Index, v.Value) r := c.out.output[v.Index] if r.index != v.Index { t.Errorf("want: index %v\tgot: index %v", r.index, v.Index) } if r.value != v.Value { t.Errorf("want value %v\tgot: value %v", r.value, v.Value) } } } }() } }
実行すると以下のようになります。
$ go test -v github.com/devlights/gomy/chans -run ^TestEnumerate.*$ === RUN TestEnumerate TestEnumerate: enumerate_test.go:56: [test-00] [0][hello] TestEnumerate: enumerate_test.go:56: [test-00] [1][world] --- PASS: TestEnumerate (0.00s) PASS ok github.com/devlights/gomy/chans 0.085s
こういうインデックス付与したい場合って、パイプラインでデータソースに対して複数のワーカーが非同期でデータを取得していって
ある程度の粒度で処理が完了したら、下流に流す前にデータを元の並びに戻してしまいたい場合とかによく使ったりしてます。
ファンアウトした後の結果を並び替えるって感じですね。
以下、そんな感じのサンプルもつくってみました。
package async import ( "sort" "time" "github.com/devlights/gomy/chans" "github.com/devlights/gomy/output" ) // OrderedAfterAsyncProc -- chans.Enumerate() を使った非同期処理をした後に正しい順序に並び替えるサンプルです. func OrderedAfterAsyncProc() error { type ( result struct { index int value interface{} } ) var ( givenTime = 1 * time.Second numGoroutine = 2 items = []interface{}{"hello", "world", "こんにちわ", "世界"} results = make([]*result, 0, 0) ) var ( done = make(chan struct{}) outCh = make(chan interface{}) ) defer close(done) // 処理するのに t に指定された時間がかかる関数 fn := func(item interface{}, t time.Duration) { <-time.After(t) output.Stdoutl("[処理]", item) } // パイプライン生成 forEachCh := chans.ForEach(done, items...) enumerateCh := chans.Enumerate(done, forEachCh) doneChList := chans.FanOut(done, enumerateCh, numGoroutine, func(e interface{}) { if v, ok := e.(*chans.IterValue); ok { fn(v.Value, givenTime) outCh <- &result{ index: v.Index, value: v.Value, } } }) // 処理完了とともに出力用チャネルを閉じる go func() { defer close(outCh) <-chans.WhenAll(doneChList...) }() // 結果を吸い出し for v := range outCh { results = append(results, v.(*result)) } // 正しい順序に並び替え sort.Slice(results, func(i, j int) bool { return results[i].index < results[j].index }) // 最終結果を出力 output.StdoutHr() for _, v := range results { output.Stdoutl("[最終結果]", v.value) } return nil }
実行すると以下のような感じになります。
$ make run ENTER EXAMPLE NAME: async_ordered_after_async_proc [Name] "async_ordered_after_async_proc" [処理] こんにちわ [処理] hello [処理] 世界 [処理] world -------------------------------------------------- [最終結果] hello [最終結果] world [最終結果] こんにちわ [最終結果] 世界 [Elapsed] 2.0002804s
処理中、つまり、ファンアウトさせてるときは、バラバラに処理してるので順序が異なってますが
最終結果のフェーズで chans.Enumerate に付与されたインデックスで再オーダーしているので結果が元の順になっています。
参考

- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場