概要
小ネタ。せっかくサンプル作ったので、ここにメモメモ。
何らかのデータシーケンスがあるとして、それを一定数貯めてから処理するというのはよくあるシチュエーションですね。
個人的には、fan-outさせる際に各非同期処理に処理してもらう塊を作る際によくやります。
元のデータシーケンスがチャネルだった場合に、同じように一定数貯めてから吐き出すチャネルがあると便利かもしれません。
内部的には以前にブログで記事にアップした
を使って、必要数まで貯まるまで待っておいて、溜まったら吐き出すという感じです。
サンプル
package chans // Buffer は、入力を指定した件数分に束ねてデータを返すチャネルを生成します. func Buffer(done <-chan struct{}, in <-chan interface{}, count int) <-chan []interface{} { out := make(chan []interface{}) go func() { defer close(out) for { items := make([]interface{}, 0, count) for item := range Take(done, in, count) { items = append(items, item) } if len(items) == 0 { break } select { case <-done: case out <- items: } if len(items) != count { break } } }() return out }
gomy/buffer.go at master · devlights/gomy · GitHub
実行イメージは以下の感じ
package chans_test import ( "context" "fmt" "time" "github.com/devlights/gomy/chans" ) func ExampleBuffer() { var ( rootCtx = context.Background() mainCtx, mainCxl = context.WithCancel(rootCtx) procCtx, procCxl = context.WithTimeout(mainCtx, 50*time.Millisecond) ) defer mainCxl() defer procCxl() var ( data = []interface{}{1, 2, 3, 4, 5, 6, 7} count = 3 ) numbers := chans.Generator(procCtx.Done(), data...) chunks := chans.Buffer(procCtx.Done(), numbers, count) for chunk := range chunks { fmt.Println(chunk) } // Output: // [1 2 3] // [4 5 6] // [7] }
gomy/buffer_test.go at master · devlights/gomy · GitHub
$ go test ./chans/ -v -race -run "^ExampleBuffer$" === RUN ExampleBuffer --- PASS: ExampleBuffer (0.00s) PASS ok github.com/devlights/gomy/chans 0.024s
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場