概要
先日、指定されたワーカー数でファンアウトさせる関数をつくってみました。
ついでなので、それを使った型を一つ作ってみました。
処理を書いていると、sync.WaitGroupを頻繁に利用すると思いますが
たまに、sync.WaitGroup が複数存在していて、それらをまとめて管理する必要があるときがあります。
そういうときに
// wgs という名前で sync.WaitGroup のリストを持っているとする for _, wg := wgs { wg.Wait() }
とか書いてしまうと、リストに入っている順に WaitGroup が完了していかないときに
余計に時間がかかってしまいます。それぞれの WaitGroup は独立して存在しているので
完了待機するのも独立して待機すればいい感じ。
以下、サンプルです。
サンプル
package async import ( "sync" "github.com/devlights/gomy/chans" ) type ( // MergedWaitGroup -- sync.WaitGroup をまとめて管理するための振る舞いを持つインターフェースです。 MergedWaitGroup interface { // Add -- 指定した sync.WaitGroup を管理対象に追加します。 Add(wg *sync.WaitGroup) // Wait -- 内部で管理している sync.WaitGroup が全て完了するまで待機します。 // // 本メソッドは、呼び出すとブロックします。 Wait() } mergedWg struct { wgs []interface{} } ) // NewMergedWaitGroup -- MergedWaitGroup を生成します。 func NewMergedWaitGroup(wgs ...*sync.WaitGroup) MergedWaitGroup { m := new(mergedWg) for _, v := range wgs { m.wgs = append(m.wgs, v) } return m } // Add -- impl MergedWaitGroup.Add func (m *mergedWg) Add(wg *sync.WaitGroup) { m.wgs = append(m.wgs, wg) } // Wait -- impl MergedWaitGroup.Wait func (m *mergedWg) Wait() { done := make(chan struct{}) defer close(done) <-chans.WhenAll(chans.FanOut(done, chans.ForEach(done, m.wgs...), len(m.wgs), func(v interface{}) { if wg, ok := v.(*sync.WaitGroup); ok { wg.Wait() } })...) }
gomy/mergedwg.go at master · devlights/gomy · GitHub
以下、テストコードです。
package async import ( "sync" "testing" "time" ) func TestMergedWaitGroup(t *testing.T) { type ( testin struct { wgCount int waitTimes []time.Duration } testout struct { estimation time.Duration } testcase struct { in testin out testout } ) cases := []testcase{ { in: testin{ wgCount: 1, waitTimes: []time.Duration{ 100 * time.Millisecond, }, }, out: testout{estimation: 110 * time.Millisecond}, }, { in: testin{ wgCount: 3, waitTimes: []time.Duration{ 100 * time.Millisecond, 200 * time.Millisecond, 300 * time.Millisecond, }, }, out: testout{estimation: 310 * time.Millisecond}, }, { in: testin{ wgCount: 5, waitTimes: []time.Duration{ 100 * time.Millisecond, 200 * time.Millisecond, 300 * time.Millisecond, 400 * time.Millisecond, 500 * time.Millisecond, }, }, out: testout{estimation: 510 * time.Millisecond}, }, } for caseIndex, c := range cases { func() { wgList := make([]*sync.WaitGroup, 0, 0) for i := 0; i < c.in.wgCount; i++ { wg := sync.WaitGroup{} wgList = append(wgList, &wg) wg.Add(1) go func(wg *sync.WaitGroup, waitTime time.Duration, index int) { defer wg.Done() defer t.Logf("[wg-%02d] wait done : %v", index, waitTime) t.Logf("[wg-%02d] wait start: %v", index, waitTime) <-time.After(waitTime) }(&wg, c.in.waitTimes[i], i) } mwg := NewMergedWaitGroup(wgList...) start := time.Now() mwg.Wait() elapsed := time.Since(start) t.Logf("[test-%02d][wgCount=%d]\telapsed: %v\testimation: %v", caseIndex, c.in.wgCount, elapsed, c.out.estimation) if c.out.estimation < elapsed { t.Errorf("want: <- %v\tgot: %v", c.out.estimation, elapsed) } }() } }
実行すると以下のようになります。
$ go test -v github.com/devlights/gomy/async -run ^TestMergedWaitGroup.*$ -count=1 === RUN TestMergedWaitGroup TestMergedWaitGroup: mergedwg_test.go:73: [wg-00] wait start: 100ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-00] wait done : 100ms TestMergedWaitGroup: mergedwg_test.go:85: [test-00][wgCount=1] elapsed: 100.7799ms estimation: 110ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-00] wait start: 100ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-02] wait start: 300ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-01] wait start: 200ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-00] wait done : 100ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-01] wait done : 200ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-02] wait done : 300ms TestMergedWaitGroup: mergedwg_test.go:85: [test-01][wgCount=3] elapsed: 300.3517ms estimation: 310ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-02] wait start: 300ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-00] wait start: 100ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-01] wait start: 200ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-03] wait start: 400ms TestMergedWaitGroup: mergedwg_test.go:73: [wg-04] wait start: 500ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-00] wait done : 100ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-01] wait done : 200ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-02] wait done : 300ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-03] wait done : 400ms TestMergedWaitGroup: mergedwg_test.go:76: [wg-04] wait done : 500ms TestMergedWaitGroup: mergedwg_test.go:85: [test-02][wgCount=5] elapsed: 500.7588ms estimation: 510ms --- PASS: TestMergedWaitGroup (0.90s) PASS ok github.com/devlights/gomy/async 0.982s
内部で各WaitGroupの完了待機をファンアウトさせてるので、待ち時間が最も長いものの時間で完了できていますね。
これが概要で挙げたように同期でシーケンシャルにWait呼ぶような実装していると、完了するのにかかる時間が直列になってしまいます。
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場