概要
チャネル関数の続き。
今回は、終了判定のdoneチャネルと入力データのチャネルのどちらかが閉じたら閉じるチャネルを返す関数について。
Goで、チャネルを使った処理を書くと、とても便利です。例えば、何かのチャネルのデータを出力する場合は
for v := range ch { // v に対して何か処理する }
という様にループでサクッと処理できます。
ですが、大抵の場合、チャネル一個で処理することよりも、どこかでキャンセルされる場合もあるので
<-chan struct{}
のようなdoneチャネルもついでに渡されていて、入力チャネルかdoneチャネルのどちらかが完了扱いになったら処理を打ち切るようにすることが多いです。
そうなると、上記のデータ処理部分が以下のようになります。
chLoop: for { select { case <-done: break chLoop case v, ok := <-inCh: if !ok { break chLoop } // v に対して何か処理する } }
上記はGoでは頻発する書き方なので、慣れているとなんとも思わないのですが、ちょっと分量多いですね。
できれば、以下のように書きたい。
for v := range OrDone(done, inCh) { // v に対して何かする }
今回の関数は上記のような振る舞いをする関数です。
サンプル
package chans // OrDone -- 指定された終了チャネルと入力用チャネルのどちらかが閉じたら閉じるチャネルを返します。 func OrDone(done <-chan struct{}, in <-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) for { select { case <-done: return case v, ok := <-in: if !ok { return } select { case out <- v: case <-done: } } } }() return out }
gomy/ordone.go at master · devlights/gomy · GitHub
以下、テストコードです。
package chans import ( "testing" "time" ) func TestOrDone(t *testing.T) { var ( done = make(chan struct{}) inCh = make(chan interface{}) chList = make([]<-chan struct{}, 0, 0) results = make([]interface{}, 0, 0) ) chList = append(chList, done) // 3 秒後に done チャネルを閉じる chList = append(chList, func() <-chan struct{} { terminated := make(chan struct{}) go func() { defer close(terminated) <-time.After(3 * time.Second) close(done) }() return terminated }()) // 1 秒毎に inCh にデータを送る chList = append(chList, func() <-chan struct{} { terminated := make(chan struct{}) go func() { defer close(terminated) defer close(inCh) for v := range Repeat(done, 1) { inCh <- v <-time.After(1 * time.Second) } }() return terminated }()) // inCh からのデータを出力 for v := range OrDone(done, inCh) { t.Logf("[result] %v", v) results = append(results, v) } <-WhenAll(chList...) if len(results) != 3 { t.Errorf("want: 3\tgot: %v", len(results)) } }
実行すると以下のようになります。
$ go test -v github.com/devlights/gomy/chans -run ^TestOrDone.*$ === RUN TestOrDone TestOrDone: ordone_test.go:46: [result] 1 TestOrDone: ordone_test.go:46: [result] 1 TestOrDone: ordone_test.go:46: [result] 1 --- PASS: TestOrDone (3.00s) PASS ok github.com/devlights/gomy/chans 3.007s
ついでに、context を利用している場合のサンプル。こっちの方が若干すっきりしますね。
package async import ( "context" "time" "github.com/devlights/gomy/chans" "github.com/devlights/gomy/output" ) // OrDoneOneInput -- chans.OrDone() を利用して処理するサンプルです。(入力チャネルが一つの場合) func OrDoneOneInput() error { // 3 秒後に終了するコンテキストを生成 var ( rootCtx = context.Background() mainCtx, cancel = context.WithTimeout(rootCtx, 3*time.Second) inCh = make(chan interface{}) ) defer cancel() defer close(inCh) // 1 秒毎にデータを送りつづけるチャネル生成 subChDone := func(done <-chan struct{}, in chan<- interface{}) <-chan struct{} { out := make(chan struct{}) go func() { defer close(out) for v := range chans.Repeat(done, 1) { in <- v <-time.After(1 * time.Second) } }() return out }(mainCtx.Done(), inCh) // データを出力 // この出力を実施している間にコンテキストのタイムアウトを迎えるため // 自動的にOrDone()の内部で終了判定となり、ループを抜けることになる。 for v := range chans.OrDone(mainCtx.Done(), inCh) { output.Stdoutl("[main]", v) } // 各ゴルーチンの終了を待機 <-chans.WhenAll(mainCtx.Done(), subChDone) return nil }
try-golang/ordone_oneinput.go at master · devlights/try-golang · GitHub
実行すると以下のようになります。
$ make run ENTER EXAMPLE NAME: async_ordone [Name] "async_ordone_one_input" [main] 1 [main] 1 [main] 1 [Elapsed] 3.008580508s
参考
- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場