いろいろ備忘録日記

主に .NET とか Go とか Python絡みのメモを公開しています。

Goメモ-71 (指定されたチャネルのどちらかが閉じたら閉じるチャネル, OrDone)

概要

チャネル関数の続き。

今回は、終了判定のdoneチャネルと入力データのチャネルのどちらかが閉じたら閉じるチャネルを返す関数について。

Goで、チャネルを使った処理を書くと、とても便利です。例えば、何かのチャネルのデータを出力する場合は

for v := range ch {
    // v に対して何か処理する
}

という様にループでサクッと処理できます。

ですが、大抵の場合、チャネル一個で処理することよりも、どこかでキャンセルされる場合もあるので

<-chan struct{} のようなdoneチャネルもついでに渡されていて、入力チャネルかdoneチャネルのどちらかが完了扱いになったら処理を打ち切るようにすることが多いです。

そうなると、上記のデータ処理部分が以下のようになります。

chLoop:
for {
    select {
    case <-done:
        break chLoop
    case v, ok := <-inCh:
        if !ok {
            break chLoop
        }
        
        // v に対して何か処理する
    }
}

上記はGoでは頻発する書き方なので、慣れているとなんとも思わないのですが、ちょっと分量多いですね。

できれば、以下のように書きたい。

for v := range OrDone(done, inCh) {
    // v に対して何かする
}

今回の関数は上記のような振る舞いをする関数です。

サンプル

package chans

// OrDone -- 指定された終了チャネルと入力用チャネルのどちらかが閉じたら閉じるチャネルを返します。
func OrDone(done <-chan struct{}, in <-chan interface{}) <-chan interface{} {
    out := make(chan interface{})

    go func() {
        defer close(out)

        for {
            select {
            case <-done:
                return
            case v, ok := <-in:
                if !ok {
                    return
                }

                select {
                case out <- v:
                case <-done:
                }
            }
        }
    }()

    return out
}

gomy/ordone.go at master · devlights/gomy · GitHub

以下、テストコードです。

package chans

import (
    "testing"
    "time"
)

func TestOrDone(t *testing.T) {
    var (
        done    = make(chan struct{})
        inCh    = make(chan interface{})
        chList  = make([]<-chan struct{}, 0, 0)
        results = make([]interface{}, 0, 0)
    )

    chList = append(chList, done)

    // 3 秒後に done チャネルを閉じる
    chList = append(chList, func() <-chan struct{} {
        terminated := make(chan struct{})
        go func() {
            defer close(terminated)
            <-time.After(3 * time.Second)
            close(done)
        }()
        return terminated
    }())

    // 1 秒毎に inCh にデータを送る
    chList = append(chList, func() <-chan struct{} {
        terminated := make(chan struct{})
        go func() {
            defer close(terminated)
            defer close(inCh)

            for v := range Repeat(done, 1) {
                inCh <- v
                <-time.After(1 * time.Second)
            }
        }()
        return terminated
    }())

    // inCh からのデータを出力
    for v := range OrDone(done, inCh) {
        t.Logf("[result] %v", v)
        results = append(results, v)
    }

    <-WhenAll(chList...)

    if len(results) != 3 {
        t.Errorf("want: 3\tgot: %v", len(results))
    }
}

実行すると以下のようになります。

$ go test -v github.com/devlights/gomy/chans -run ^TestOrDone.*$
=== RUN   TestOrDone
    TestOrDone: ordone_test.go:46: [result] 1
    TestOrDone: ordone_test.go:46: [result] 1
    TestOrDone: ordone_test.go:46: [result] 1
--- PASS: TestOrDone (3.00s)
PASS
ok      github.com/devlights/gomy/chans 3.007s

ついでに、context を利用している場合のサンプル。こっちの方が若干すっきりしますね。

package async

import (
    "context"
    "time"

    "github.com/devlights/gomy/chans"
    "github.com/devlights/gomy/output"
)

// OrDoneOneInput -- chans.OrDone() を利用して処理するサンプルです。(入力チャネルが一つの場合)
func OrDoneOneInput() error {
    // 3 秒後に終了するコンテキストを生成
    var (
        rootCtx         = context.Background()
        mainCtx, cancel = context.WithTimeout(rootCtx, 3*time.Second)
        inCh            = make(chan interface{})
    )

    defer cancel()
    defer close(inCh)

    // 1 秒毎にデータを送りつづけるチャネル生成
    subChDone := func(done <-chan struct{}, in chan<- interface{}) <-chan struct{} {
        out := make(chan struct{})
        go func() {
            defer close(out)

            for v := range chans.Repeat(done, 1) {
                in <- v
                <-time.After(1 * time.Second)
            }
        }()
        return out
    }(mainCtx.Done(), inCh)

    // データを出力
    //   この出力を実施している間にコンテキストのタイムアウトを迎えるため
    //   自動的にOrDone()の内部で終了判定となり、ループを抜けることになる。
    for v := range chans.OrDone(mainCtx.Done(), inCh) {
        output.Stdoutl("[main]", v)
    }

    // 各ゴルーチンの終了を待機
    <-chans.WhenAll(mainCtx.Done(), subChDone)

    return nil
}

try-golang/ordone_oneinput.go at master · devlights/try-golang · GitHub

実行すると以下のようになります。

$ make run
ENTER EXAMPLE NAME: async_ordone
[Name] "async_ordone_one_input"
[main]               1
[main]               1
[main]               1


[Elapsed] 3.008580508s

参考

Go言語による並行処理

Go言語による並行処理

関連記事

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com

devlights.hatenablog.com


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com