いろいろ備忘録日記

主に .NET とか Go とか Python絡みのメモを公開しています。

Goメモ-72 (指定されたチャネルのシーケンスを順に消費していくチャネル, Concat)

概要

チャネル関数の続き。

今回は、複数のチャネルのシーケンスを一つにまとめて順に消費していくチャネルを作る関数について。

例えば、3つのファイルを読み込む処理書いていて、それぞれのファイルを読む処理をゴルーチンにしてたとします。

んで、結果はそれぞれチャネルで受け取るとすると、3つのチャネルを持っていることになります。

結果は一つのリストなりファイルなりに纏めるとして

for v := range ch1 {
}

for v := range ch2 {
}

for v := range ch3 {
}

ってやるか、そもそもスライスで持っておいて

for _, ch := range chList {
    for v := range ch {
    }
}

でもいいんですが、ちょっと冗長かなと。以下のようにしたい。

for v := range chans.Concat(done, chList...) {
}

名前はConcatにしときました。書籍では Bridge になってましたが、入力が <-chan <-chan interface{} になっていたので、...<-chan interface{} を扱う関数として別に用意。内部で Bridge 呼んでいるだけですが。

サンプル

package chans

// Concat -- 指定されたチャネルのシーケンスを順に消費していく単一のチャネルを返します。
func Concat(done <-chan struct{}, chList ...<-chan interface{}) <-chan interface{} {
    if len(chList) == 0 {
        c := make(chan interface{})
        close(c)

        return c
    }

    chSeq := make(chan (<-chan interface{}), len(chList))
    func() {
        defer close(chSeq)

        for _, c := range chList {
            chSeq <- c
        }
    }()

    return Bridge(done, chSeq)
}

// Bridge -- 指定されたチャネルのシーケンスを順に消費していく単一のチャネルを返します。
func Bridge(done <-chan struct{}, chanCh <-chan <-chan interface{}) <-chan interface{} {
    out := make(chan interface{})

    go func() {
        defer close(out)

        for {
            var ch <-chan interface{}
            select {
            case c, ok := <-chanCh:
                if !ok {
                    return
                }

                ch = c
            case <-done:
                return
            }

            for v := range OrDone(done, ch) {
                select {
                case out <- v:
                case <-done:
                }
            }
        }
    }()

    return out
}

gomy/concat.go at master · devlights/gomy · GitHub

以下、テストコードです。

package chans

import (
    "reflect"
    "testing"
)

func TestConcat(t *testing.T) {
    type (
        testin struct {
            data [][]interface{}
        }
        testout struct {
            result []interface{}
        }
        testcase struct {
            in  testin
            out testout
        }
    )

    cases := []testcase{
        {
            in: testin{data: [][]interface{}{
                {1, 2, 3},
                {4, 5, 6},
            }},
            out: testout{result: []interface{}{
                1, 2, 3, 4, 5, 6,
            }},
        },
    }

    for _, c := range cases {
        func() {
            done := make(chan struct{})
            defer close(done)

            chList := make([]<-chan interface{}, 0, len(c.in.data))
            for _, list := range c.in.data {
                func() {
                    ch := make(chan interface{}, len(list))
                    defer close(ch)

                    for _, v := range list {
                        ch <- v
                    }

                    chList = append(chList, ch)
                }()
            }

            concatCh := Concat(done, chList...)

            results := make([]interface{}, 0, 0)
            for v := range concatCh {
                t.Log(v)
                results = append(results, v)
            }

            // concat の場合は、fanIn と異なり取得順序は確定なので中身も一致していることをテストする
            t.Logf("[c.out.result] %v", c.out.result)
            t.Logf("[results     ] %v", results)

            if !reflect.DeepEqual(c.out.result, results) {
                t.Errorf("want: %v\tgot: %v", c.out.result, results)
            }
        }()
    }
}

実行すると以下のようになります。

$ go test -v github.com/devlights/gomy/chans -run ^TestConcat.*$
=== RUN   TestConcat
    TestConcat: concat_test.go:57: 1
    TestConcat: concat_test.go:57: 2
    TestConcat: concat_test.go:57: 3
    TestConcat: concat_test.go:57: 4
    TestConcat: concat_test.go:57: 5
    TestConcat: concat_test.go:57: 6
    TestConcat: concat_test.go:62: [c.out.result] [1 2 3 4 5 6]
    TestConcat: concat_test.go:63: [results     ] [1 2 3 4 5 6]
--- PASS: TestConcat (0.00s)
PASS

ついでに、context を利用している場合のサンプル。

package concat

import (
    "context"
    "fmt"
    "math/rand"
    "time"

    "github.com/devlights/gomy/chans"
    "github.com/devlights/gomy/output"
)

// MultiChannelConcat -- chans.Concat() を利用して処理するサンプルです。
func MultiChannelConcat() error {
    var (
        numGoroutine   = 5                      // 並行起動するデータ取得ゴルーチンの数
        takeCount      = 2                      // データ取得チャネル毎にいくつデータを取得するかの数
        dataInInterval = 100 * time.Millisecond // データ投入時のインターバル
    )

    // コンテキストを生成
    var (
        rootCtx         = context.Background()
        mainCtx, cancel = context.WithCancel(rootCtx)
    )

    // 入力元となるチャネルと各ゴルーチンの終了判定チャネル
    var (
        srcCh      = make(chan interface{})
        doneChList = make([]<-chan struct{}, 0, 0)
        takeChList = make([]<-chan interface{}, 0, numGoroutine)
    )

    defer close(srcCh)

    // srcCh に データを投入していくゴルーチン起動
    doneChList = append(doneChList, gen(mainCtx.Done(), srcCh, dataInInterval))

    // srcCh から takeCount個 取り出すチャネルを複数生成
    for i := 0; i < numGoroutine; i++ {
        takeDoneCh, takeCh := take(mainCtx.Done(), srcCh, i+1, takeCount)

        doneChList = append(doneChList, takeDoneCh)
        takeChList = append(takeChList, takeCh)
    }

    // 複数の取得チャネルを纏めてしまって、出力 (出力順序は守る)
    for v := range chans.Concat(mainCtx.Done(), takeChList...) {
        output.Stdoutl("[main]", v)
    }

    cancel()
    <-chans.WhenAll(doneChList...)

    return nil
}

func take(done <-chan struct{}, in <-chan interface{}, index int, takeCount int) (<-chan struct{}, <-chan interface{}) {
    terminated := make(chan struct{})
    out := make(chan interface{})
    go func() {
        // defer output.Stdoutf("take-goroutine", "%02d END\n", index)
        defer close(terminated)
        defer close(out)
        for v := range chans.Take(done, in, takeCount) {
            out <- fmt.Sprintf("[take-%02d] %v", index, v)
        }
    }()
    return terminated, out
}

func gen(done <-chan struct{}, in chan<- interface{}, interval time.Duration) <-chan struct{} {
    out := make(chan struct{})
    go func() {
        // defer output.Stdoutl("gen-goroutine", "END")
        defer close(out)

        randInt := func() interface{} {
            return rand.Int()
        }

        for v := range chans.RepeatFn(done, randInt) {
            in <- v
            <-time.After(interval)
        }
    }()
    return out
}

try-golang/multi_channel_concat.go at master · devlights/try-golang · GitHub

実行すると以下のようになります。

$ make run
ENTER EXAMPLE NAME: async_multi_channel_concat
[Name] "async_multi_channel_concat"
[main]               [take-01] 4692119960690034160
[main]               [take-01] 8604966331458351554
[main]               [take-02] 7875191509257954031
[main]               [take-02] 4998976565280053056
[main]               [take-03] 4053247634135861360
[main]               [take-03] 5272630624697359627
[main]               [take-04] 2769253797494110094
[main]               [take-04] 3775252429711134731
[main]               [take-05] 3856557697572725505
[main]               [take-05] 1322177934211094863


[Elapsed] 1.0050107s

先に作ったチャネルから順にデータを取得していってますね。

参考

Go言語による並行処理

Go言語による並行処理

関連記事

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com