いろいろ備忘録日記

主に .NET とか Go とか Python絡みのメモを公開しています。

Goメモ-79 (指定されたワーカー数でファンアウトさせる関数, FanOut)

概要

引き続き、小ネタチャネル関数の続き。( #関連記事 参照)

以前にファンインする関数について記事書きました。

devlights.hatenablog.com

ついでなので、ファンアウトさせる関数も書いてみました。

サンプル

package chans

// FanOut -- 指定されたチャネルの処理を指定されたワーカーの数でファンアウトします。
//
// チャネルからデータを取得するたびに引数 callback が呼ばれます。
func FanOut(done <-chan struct{}, in <-chan interface{}, workerCount int, callback func(interface{})) []<-chan struct{} {
    outChList := make([]<-chan struct{}, 0, 0)

    for i := 0; i < workerCount; i++ {
        out := make(chan struct{})

        go func() {
            defer close(out)

            for v := range OrDone(done, in) {
                select {
                case <-done:
                    return
                default:
                }

                callback(v)
            }
        }()

        outChList = append(outChList, out)
    }

    return outChList
}

gomy/fanout.go at master · devlights/gomy · GitHub

ファンアウトは一つの入力を複数のワーカーが処理するパターンなので関数の内部で複数のゴルーチンを起動します。

なので、戻り値は <-chan struct{} のスライスにしてあります。

一気に待ち合わせしたい場合は

<-WhenAll(FanOut(...))

ってやれば出来ますね。

以下テストコードです。

package chans

import (
    "testing"
    "time"
)

func TestFanOut(t *testing.T) {
    type (
        testin struct {
            workerCount int
            input       []interface{}
            interval    time.Duration
        }
        testout struct {
            estimation time.Duration
        }
        testcase struct {
            in  testin
            out testout
        }
    )

    cases := []testcase{
        {
            in: testin{
                workerCount: 1,
                input: []interface{}{
                    1, 2, 3, 4, 5, 6,
                },
                interval: 100 * time.Millisecond,
            },
            out: testout{
                estimation: ((6/1 + 1) * 100) * time.Millisecond,
            },
        },
        {
            in: testin{
                workerCount: 2,
                input: []interface{}{
                    1, 2, 3, 4, 5, 6,
                },
                interval: 100 * time.Millisecond,
            },
            out: testout{
                estimation: ((6/2 + 1) * 100) * time.Millisecond,
            },
        },
        {
            in: testin{
                workerCount: 3,
                input: []interface{}{
                    1, 2, 3, 4, 5, 6,
                },
                interval: 100 * time.Millisecond,
            },
            out: testout{
                estimation: ((6/3 + 1) * 100) * time.Millisecond,
            },
        },
    }

    for caseIndex, c := range cases {
        func(index int) {
            done := make(chan struct{})
            defer close(done)

            start := time.Now()
            <-WhenAll(FanOut(
                done,
                ForEach(done, c.in.input...),
                c.in.workerCount,
                func(v interface{}) {
                    <-time.After(c.in.interval)
                })...)
            elapsed := time.Since(start)

            t.Logf("[workerCount=%d][estimation] %v\t[elapsed] %v", c.in.workerCount, c.out.estimation, elapsed)
            if c.out.estimation < elapsed {
                t.Errorf("want: <= %v\tgot: %v", c.out.estimation, elapsed)
            }
        }(caseIndex + 1)
    }
}

実行すると以下のようになります。

$ go test -v github.com/devlights/gomy/chans -run ^TestFanOut.*$
=== RUN   TestFanOut
    TestFanOut: fanout_test.go:78: [workerCount=1][estimation] 700ms    [elapsed] 601.3349ms
    TestFanOut: fanout_test.go:78: [workerCount=2][estimation] 400ms    [elapsed] 302.4562ms
    TestFanOut: fanout_test.go:78: [workerCount=3][estimation] 300ms    [elapsed] 200.512ms
--- PASS: TestFanOut (1.10s)
PASS
ok      github.com/devlights/gomy/chans 1.183s

同じデータに対してワーカー数を増やしていくと、処理時間が変化していっていますね。

ただ、GoにおいてゴルーチンはOSのスレッドそのものではなくて、GoのランタイムがOSのスレッドに多重化してうまくスケジューリングしてくれるものなので、必ずしもワーカー数をどんどん増やしたからといって良くなるわけではありません。ユーザはゴルーチンのスケジューリングについては手出しができないので、最適な同時実行数はGoのランタイム任せになります。

参考

Go言語による並行処理

Go言語による並行処理

関連記事

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com