いろいろ備忘録日記

主に .NET とか Go とか Python絡みのメモを公開しています。

Goメモ-73 (fan-in パターンでデータを集約するチャネル, FanIn)

概要

チャネル関数の続き。

今回は、Fan-In (ファンイン) パターンで複数のチャネルのシーケンスを一つにまとめて消費していくチャネルを作る関数について。

非同期系の本とか呼んでいると、よくファン・イン(fan-in) とか ファン・アウト(fan-out) とかよく聞きます。

ファンアウトは、どっちかというとWorkersパターンって呼ばれる方が多いですかね。

本来は、回路設計とかの話で出てくる用語なのですが、ソフトウェアの開発で非同期周りでよく話題に上がります。

身構える必要は全然なくて

  • ファンアウト (fan-out) は、一つのキューに対してNのワーカーがデータを処理する
  • ファンイン (fan-in)は、その逆でNのキューのデータを一つに纏める

って感じで覚えておけばいいです。どちらも用語は知らなくてもいつの間にかそんな処理書いてたってことが多いですね。

Goにおいて、キューってのはチャネルを表すことが出来るので、今回のファンインは

Nのチャネルのデータを一つのチャネルに集約する って感じになります。

前回のConcatと何が違うの?ってなりますが、Concatの方は、指定されたNのチャネルを前から順に使っていってデータを吸い出していきます。今回のファンインさんは、Nのチャネルを指定されるところは同じですが、前から順に行くのではなく、データの吸い出し部分も非同期させます。順序を問わないってことですね。

Goの公式ブログでも記載されています。

blog.golang.org

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that's closed when all the inputs are closed. This is called fan-in.

サンプル

package chans

// FanIn -- 指定されたチャネルリストをファンインするチャネルを返します。
func FanIn(done <-chan struct{}, channels ...<-chan interface{}) <-chan interface{} {
    out := make(chan interface{})

    chList := make([]<-chan struct{}, 0, 0)
    for _, in := range channels {
        chList = append(chList, func() <-chan struct{} {
            terminated := make(chan struct{})

            go func(in <-chan interface{}) {
                defer close(terminated)

                for v := range in {
                    select {
                    case <-done:
                        return
                    case out <- v:
                    }
                }
            }(in)

            return terminated
        }())
    }

    go func(chList []<-chan struct{}) {
        defer close(out)
        <-WhenAll(chList...)
    }(chList)

    return out
}

gomy/fanin.go at master · devlights/gomy · GitHub

以下、テストコードです。

package chans

import (
    "testing"
)

func TestFanIn(t *testing.T) {
    type (
        testin struct {
            data [][]interface{}
        }
        testout struct {
            result []interface{}
        }
        testcase struct {
            in  testin
            out testout
        }
    )

    cases := []testcase{
        {
            in: testin{data: [][]interface{}{
                {1, 2, 3},
                {4, 5, 6},
            }},
            out: testout{result: []interface{}{
                1, 2, 3, 4, 5, 6,
            }},
        },
    }

    for _, c := range cases {
        func() {
            done := make(chan struct{})
            defer close(done)

            chList := make([]<-chan interface{}, 0, len(c.in.data))
            for _, list := range c.in.data {
                func() {
                    ch := make(chan interface{}, len(list))
                    defer close(ch)

                    for _, v := range list {
                        ch <- v
                    }

                    chList = append(chList, ch)
                }()
            }

            fanInCh := FanIn(done, chList...)

            results := make([]interface{}, 0, 0)
            for v := range fanInCh {
                t.Log(v)
                results = append(results, v)
            }

            // fan-in の場合は、flatten と異なり取得順序は不定となるので個数でテストする
            t.Logf("[c.out.result] %v", c.out.result)
            t.Logf("[results     ] %v", results)

            if len(c.out.result) != len(results) {
                t.Errorf("want: %v\tgot: %v", c.out.result, results)
            }
        }()
    }
}

実行すると以下のようになります。

$ go test -v github.com/devlights/gomy/chans -run ^TestFanIn.*$
=== RUN   TestFanIn
    TestFanIn: fanin_test.go:56: 1
    TestFanIn: fanin_test.go:56: 4
    TestFanIn: fanin_test.go:56: 5
    TestFanIn: fanin_test.go:56: 6
    TestFanIn: fanin_test.go:56: 2
    TestFanIn: fanin_test.go:56: 3
    TestFanIn: fanin_test.go:61: [c.out.result] [1 2 3 4 5 6]
    TestFanIn: fanin_test.go:62: [results     ] [1 4 5 6 2 3]
--- PASS: TestFanIn (0.00s)
PASS

ついでに、context を利用している場合のサンプル。前回のConcatのサンプルとほぼ同じで、Concatって呼び出しがFanInに変わっているだけです。

package fanin

import (
    "context"
    "fmt"
    "math/rand"
    "time"

    "github.com/devlights/gomy/chans"
    "github.com/devlights/gomy/output"
)

// MultiChannelFanIn -- chans.FanIn() を利用して処理するサンプルです。(入力チャネルが複数の場合)
func MultiChannelFanIn() error {
    var (
        numGoroutine   = 5                      // 並行起動するデータ取得ゴルーチンの数
        takeCount      = 2                      // データ取得チャネル毎にいくつデータを取得するかの数
        dataInInterval = 100 * time.Millisecond // データ投入時のインターバル
    )

    // コンテキストを生成
    var (
        rootCtx         = context.Background()
        mainCtx, cancel = context.WithCancel(rootCtx)
    )

    // 入力元となるチャネルと各ゴルーチンの終了判定チャネル
    var (
        srcCh      = make(chan interface{})
        doneChList = make([]<-chan struct{}, 0, 0)
        takeChList = make([]<-chan interface{}, 0, numGoroutine)
    )

    defer close(srcCh)

    // srcCh に データを投入していくゴルーチン起動
    doneChList = append(doneChList, gen(mainCtx.Done(), srcCh, dataInInterval))

    // srcCh から takeCount個 取り出すチャネルを複数生成
    for i := 0; i < numGoroutine; i++ {
        takeDoneCh, takeCh := take(mainCtx.Done(), srcCh, i+1, takeCount)

        doneChList = append(doneChList, takeDoneCh)
        takeChList = append(takeChList, takeCh)
    }

    // 複数の取得チャネルを纏めてしまって、出力 (出力順序は問わない)
    for v := range chans.FanIn(mainCtx.Done(), takeChList...) {
        output.Stdoutl("[main]", v)
    }

    cancel()
    <-chans.WhenAll(doneChList...)

    return nil
}

func take(done <-chan struct{}, in <-chan interface{}, index int, takeCount int) (<-chan struct{}, <-chan interface{}) {
    terminated := make(chan struct{})
    out := make(chan interface{})
    go func() {
        // defer output.Stdoutf("take-goroutine", "%02d END\n", index)
        defer close(terminated)
        defer close(out)
        for v := range chans.Take(done, in, takeCount) {
            out <- fmt.Sprintf("[take-%02d] %v", index, v)
        }
    }()
    return terminated, out
}

func gen(done <-chan struct{}, in chan<- interface{}, interval time.Duration) <-chan struct{} {
    out := make(chan struct{})
    go func() {
        // defer output.Stdoutl("gen-goroutine", "END")
        defer close(out)

        randInt := func() interface{} {
            return rand.Int()
        }

        for v := range chans.RepeatFn(done, randInt) {
            in <- v
            <-time.After(interval)
        }
    }()
    return out
}

try-golang/multi_channel_fanin.go at master · devlights/try-golang · GitHub

実行すると以下のようになります。

$ make run
ENTER EXAMPLE NAME: async_multi_channel_fanin
[Name] "async_multi_channel_fanin"
[main]               [take-02] 9147366453389239735
[main]               [take-02] 7707720205445421596
[main]               [take-01] 3353539379143321040
[main]               [take-05] 9007367778894470328
[main]               [take-03] 2305305389264664480
[main]               [take-04] 2885577774071629796
[main]               [take-01] 4156069089090199425
[main]               [take-05] 6668285635745240934
[main]               [take-03] 8444500350534269182
[main]               [take-04] 2050223648781894161


[Elapsed] 1.0049724s

前回のConcatの場合は、出力結果が1から順に出力されていましたが、今回はバラバラになっていますね。

このサンプルでいうと、srcChという一つのデータソースから複数のゴルーチンがtakeしている部分が fan-out、出力するために chans.FanIn を使って一つにまとめている部分が fan-in ですね。

参考

Go言語による並行処理

Go言語による並行処理

関連記事

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com