いろいろ備忘録日記

主に .NET とか Go とか Python絡みのメモを公開しています。

Goメモ-83 (チャネルの要素に対してインデックスを付与して返す関数, Enumerate)

概要

引き続き、小ネタチャネル関数の続き。( #関連記事 参照)

pythonに enumerate という関数があります。

この関数は、指定されたシーケンスを取り出すと同時にインデックスを付与して返してくれます。

こんな感じ。

$ python
Python 3.8.0 (tags/v3.8.0:fa919fd, Oct 14 2019, 19:21:23) [MSC v.1916 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> l = ['hello', 'world']
>>> for i,v in enumerate(l):
...     print(i,v)
...
0 hello
1 world

Goのチャネルを for でループすると、値が取得できるのですが

そのときにインデックスもついでにあったらいいなーってときがあります。

てことで、サンプル作ってみました。

サンプル

package chans

type (
    // IterValue -- chans.Enumerate() にて利用されるデータ型です。
    IterValue struct {
        Index int         // インデックス
        Value interface{} // 値
    }
)

func newIterValue(i int, v interface{}) *IterValue {
    return &IterValue{
        Index: i,
        Value: v,
    }
}

// Enumerate -- 指定された入力チャネルの要素に対してインデックスを付与したデータを返すチャネルを生成します。
//
// 戻り値のチャネルから取得できるデータ型は、*chans.IterValue となっています。
//
//         for e := range chans.Enumerate(done, inCh) {
//             if v, ok := e.(*IterValue); ok {
//                 // v.Index でインデックス、 v.Value で値が取得できる
//             }
//         }
//
func Enumerate(done <-chan struct{}, in <-chan interface{}) <-chan interface{} {
    out := make(chan interface{})

    go func() {
        defer close(out)

        index := 0

    ChLoop:
        for {
            select {
            case <-done:
                break ChLoop
            case v, ok := <-in:
                if !ok {
                    break ChLoop
                }

                select {
                case out <- newIterValue(index, v):
                    index++
                case <-done:
                }
            }
        }
    }()

    return out
}

gomy/enumerate.go at master · devlights/gomy · GitHub

以下テストコードです。

package chans

import (
    "testing"
)

func TestEnumerate(t *testing.T) {
    type (
        resultValue struct {
            index int
            value interface{}
        }
        testin struct {
            input []interface{}
        }
        testout struct {
            output []resultValue
        }
        testcase struct {
            in  testin
            out testout
        }
    )

    cases := []testcase{
        {
            in: testin{input: []interface{}{"hello", "world"}},
            out: testout{output: []resultValue{
                {
                    index: 0,
                    value: "hello",
                },
                {
                    index: 1,
                    value: "world",
                },
            }},
        },
    }

    for caseIndex, c := range cases {
        func() {
            done := make(chan struct{})
            defer close(done)

            inCh := make(chan interface{})
            go func() {
                defer close(inCh)
                for _, v := range c.in.input {
                    inCh <- v
                }
            }()

            for e := range Enumerate(done, inCh) {
                if v, ok := e.(*IterValue); ok {
                    t.Logf("[test-%02d] [%v][%v]", caseIndex, v.Index, v.Value)

                    r := c.out.output[v.Index]
                    if r.index != v.Index {
                        t.Errorf("want: index %v\tgot: index %v", r.index, v.Index)
                    }

                    if r.value != v.Value {
                        t.Errorf("want value %v\tgot: value %v", r.value, v.Value)
                    }
                }
            }
        }()
    }
}

実行すると以下のようになります。

$ go test -v github.com/devlights/gomy/chans -run ^TestEnumerate.*$
=== RUN   TestEnumerate
    TestEnumerate: enumerate_test.go:56: [test-00] [0][hello]
    TestEnumerate: enumerate_test.go:56: [test-00] [1][world]
--- PASS: TestEnumerate (0.00s)
PASS
ok      github.com/devlights/gomy/chans 0.085s

こういうインデックス付与したい場合って、パイプラインでデータソースに対して複数のワーカーが非同期でデータを取得していって

ある程度の粒度で処理が完了したら、下流に流す前にデータを元の並びに戻してしまいたい場合とかによく使ったりしてます。

ファンアウトした後の結果を並び替えるって感じですね。

以下、そんな感じのサンプルもつくってみました。

package async

import (
    "sort"
    "time"

    "github.com/devlights/gomy/chans"
    "github.com/devlights/gomy/output"
)

// OrderedAfterAsyncProc -- chans.Enumerate() を使った非同期処理をした後に正しい順序に並び替えるサンプルです.
func OrderedAfterAsyncProc() error {
    type (
        result struct {
            index int
            value interface{}
        }
    )

    var (
        givenTime    = 1 * time.Second
        numGoroutine = 2
        items        = []interface{}{"hello", "world", "こんにちわ", "世界"}
        results      = make([]*result, 0, 0)
    )

    var (
        done  = make(chan struct{})
        outCh = make(chan interface{})
    )

    defer close(done)

    // 処理するのに t に指定された時間がかかる関数
    fn := func(item interface{}, t time.Duration) {
        <-time.After(t)
        output.Stdoutl("[処理]", item)
    }

    // パイプライン生成
    forEachCh := chans.ForEach(done, items...)
    enumerateCh := chans.Enumerate(done, forEachCh)
    doneChList := chans.FanOut(done, enumerateCh, numGoroutine, func(e interface{}) {
        if v, ok := e.(*chans.IterValue); ok {
            fn(v.Value, givenTime)
            outCh <- &result{
                index: v.Index,
                value: v.Value,
            }
        }
    })

    // 処理完了とともに出力用チャネルを閉じる
    go func() {
        defer close(outCh)
        <-chans.WhenAll(doneChList...)
    }()

    // 結果を吸い出し
    for v := range outCh {
        results = append(results, v.(*result))
    }

    // 正しい順序に並び替え
    sort.Slice(results, func(i, j int) bool {
        return results[i].index < results[j].index
    })

    // 最終結果を出力
    output.StdoutHr()
    for _, v := range results {
        output.Stdoutl("[最終結果]", v.value)
    }

    return nil
}

実行すると以下のような感じになります。

$ make run
ENTER EXAMPLE NAME: async_ordered_after_async_proc
[Name] "async_ordered_after_async_proc"
[処理]                 こんにちわ
[処理]                 hello
[処理]                 世界
[処理]                 world
-------------------------------------------------- 
[最終結果]               hello
[最終結果]               world
[最終結果]               こんにちわ
[最終結果]               世界


[Elapsed] 2.0002804s

処理中、つまり、ファンアウトさせてるときは、バラバラに処理してるので順序が異なってますが

最終結果のフェーズで chans.Enumerate に付与されたインデックスで再オーダーしているので結果が元の順になっています。

参考

Go言語による並行処理

Go言語による並行処理

関連記事

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com