いろいろ備忘録日記

主に .NET とか Go とか Python絡みのメモを公開しています。

Goメモ-81 (複数のsync.WaitGroupをまとめて管理する型, MergedWaitGroup)

概要

先日、指定されたワーカー数でファンアウトさせる関数をつくってみました。

devlights.hatenablog.com

ついでなので、それを使った型を一つ作ってみました。

処理を書いていると、sync.WaitGroupを頻繁に利用すると思いますが

たまに、sync.WaitGroup が複数存在していて、それらをまとめて管理する必要があるときがあります。

そういうときに

// wgs という名前で sync.WaitGroup のリストを持っているとする
for _, wg := wgs {
    wg.Wait()
}

とか書いてしまうと、リストに入っている順に WaitGroup が完了していかないときに

余計に時間がかかってしまいます。それぞれの WaitGroup は独立して存在しているので

完了待機するのも独立して待機すればいい感じ。

以下、サンプルです。

サンプル

package async

import (
    "sync"

    "github.com/devlights/gomy/chans"
)

type (
    // MergedWaitGroup -- sync.WaitGroup をまとめて管理するための振る舞いを持つインターフェースです。
    MergedWaitGroup interface {
        // Add -- 指定した sync.WaitGroup を管理対象に追加します。
        Add(wg *sync.WaitGroup)
        // Wait -- 内部で管理している sync.WaitGroup が全て完了するまで待機します。
        //
        // 本メソッドは、呼び出すとブロックします。
        Wait()
    }

    mergedWg struct {
        wgs []interface{}
    }
)

// NewMergedWaitGroup -- MergedWaitGroup を生成します。
func NewMergedWaitGroup(wgs ...*sync.WaitGroup) MergedWaitGroup {
    m := new(mergedWg)

    for _, v := range wgs {
        m.wgs = append(m.wgs, v)
    }

    return m
}

// Add -- impl MergedWaitGroup.Add
func (m *mergedWg) Add(wg *sync.WaitGroup) {
    m.wgs = append(m.wgs, wg)
}

// Wait -- impl MergedWaitGroup.Wait
func (m *mergedWg) Wait() {
    done := make(chan struct{})
    defer close(done)

    <-chans.WhenAll(chans.FanOut(done, chans.ForEach(done, m.wgs...), len(m.wgs), func(v interface{}) {
        if wg, ok := v.(*sync.WaitGroup); ok {
            wg.Wait()
        }
    })...)
}

gomy/mergedwg.go at master · devlights/gomy · GitHub

以下、テストコードです。

package async

import (
    "sync"
    "testing"
    "time"
)

func TestMergedWaitGroup(t *testing.T) {
    type (
        testin struct {
            wgCount   int
            waitTimes []time.Duration
        }
        testout struct {
            estimation time.Duration
        }
        testcase struct {
            in  testin
            out testout
        }
    )

    cases := []testcase{
        {
            in: testin{
                wgCount: 1,
                waitTimes: []time.Duration{
                    100 * time.Millisecond,
                },
            },
            out: testout{estimation: 110 * time.Millisecond},
        },
        {
            in: testin{
                wgCount: 3,
                waitTimes: []time.Duration{
                    100 * time.Millisecond,
                    200 * time.Millisecond,
                    300 * time.Millisecond,
                },
            },
            out: testout{estimation: 310 * time.Millisecond},
        },
        {
            in: testin{
                wgCount: 5,
                waitTimes: []time.Duration{
                    100 * time.Millisecond,
                    200 * time.Millisecond,
                    300 * time.Millisecond,
                    400 * time.Millisecond,
                    500 * time.Millisecond,
                },
            },
            out: testout{estimation: 510 * time.Millisecond},
        },
    }

    for caseIndex, c := range cases {
        func() {
            wgList := make([]*sync.WaitGroup, 0, 0)

            for i := 0; i < c.in.wgCount; i++ {
                wg := sync.WaitGroup{}
                wgList = append(wgList, &wg)

                wg.Add(1)
                go func(wg *sync.WaitGroup, waitTime time.Duration, index int) {
                    defer wg.Done()

                    defer t.Logf("[wg-%02d] wait done : %v", index, waitTime)
                    t.Logf("[wg-%02d] wait start: %v", index, waitTime)

                    <-time.After(waitTime)
                }(&wg, c.in.waitTimes[i], i)
            }

            mwg := NewMergedWaitGroup(wgList...)

            start := time.Now()
            mwg.Wait()
            elapsed := time.Since(start)

            t.Logf("[test-%02d][wgCount=%d]\telapsed: %v\testimation: %v", caseIndex, c.in.wgCount, elapsed, c.out.estimation)
            if c.out.estimation < elapsed {
                t.Errorf("want: <- %v\tgot: %v", c.out.estimation, elapsed)
            }
        }()
    }
}

実行すると以下のようになります。

$ go test -v github.com/devlights/gomy/async -run ^TestMergedWaitGroup.*$ -count=1
=== RUN   TestMergedWaitGroup
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-00] wait start: 100ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-00] wait done : 100ms
    TestMergedWaitGroup: mergedwg_test.go:85: [test-00][wgCount=1]      elapsed: 100.7799ms     estimation: 110ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-00] wait start: 100ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-02] wait start: 300ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-01] wait start: 200ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-00] wait done : 100ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-01] wait done : 200ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-02] wait done : 300ms
    TestMergedWaitGroup: mergedwg_test.go:85: [test-01][wgCount=3]      elapsed: 300.3517ms     estimation: 310ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-02] wait start: 300ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-00] wait start: 100ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-01] wait start: 200ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-03] wait start: 400ms
    TestMergedWaitGroup: mergedwg_test.go:73: [wg-04] wait start: 500ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-00] wait done : 100ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-01] wait done : 200ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-02] wait done : 300ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-03] wait done : 400ms
    TestMergedWaitGroup: mergedwg_test.go:76: [wg-04] wait done : 500ms
    TestMergedWaitGroup: mergedwg_test.go:85: [test-02][wgCount=5]      elapsed: 500.7588ms     estimation: 510ms
--- PASS: TestMergedWaitGroup (0.90s)
PASS
ok      github.com/devlights/gomy/async 0.982s

内部で各WaitGroupの完了待機をファンアウトさせてるので、待ち時間が最も長いものの時間で完了できていますね。

これが概要で挙げたように同期でシーケンシャルにWait呼ぶような実装していると、完了するのにかかる時間が直列になってしまいます。


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com