いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

Goメモ-49 (ゴルーチンとチャネルでProducer/Consumer処理を作る)

概要

Goでは、ゴルーチンとチャネルがあるので、非同期処理が楽にかけます。

個人的に、よく利用するのが Producer/Consumer な処理が多いので、それのサンプル作ってみました。

チャネル自体が、ブロッキング付きのキューみたいなものなのでやりやすいですね。

C#でいうと、BlockingCollectionみたいな感じ。

非同期処理を作る場合に大事なのは各処理の役割をきっちり決めておくことと、一つの役割にいっぱい仕事を持たせないこと。

今回のサンプルでは以下の役割を用意しました。

  • 管理者
    • それぞれのタスクの管理をする役割。全体的な開始と終了を管理する
  • 生産者
    • 指定された時間枠で生産処理を実行する役割。
  • 消費者
    • 指定された時間枠で消費処理を実行する役割。
  • 回収者
    • 消費者が消費しきれなかったものを回収する役割

個人的に、Producer/Consumer処理を作る場合、私はいつも回収者という役割を作ります。

実務で処理を書くと、教科書みたいに綺麗に生産されたものを全部消費するということがなかなか無いです。

残ったデータは何らかの別の処理に回すか、エラーとしてレポートするかなどの処置をする必要があります。

この作業を消費者側の処理に書いてしまうと、消費者の役割が増えてしまって見づらくなるので分けるようにしています。

後、Goで非同期処理書く場合は、各ゴルーチンをきっちり終了させられるようにしておくことが大事。

ゴルーチンはGCの対象とならないので、ちゃんと終了しきれていないゴルーチンはリーク対象となります。

これはGoだけではなくてどの言語でも同じですね。

仕様みたいなもの

そのままフル稼働で生産と消費を行っても面白くないので、以下の仕様みたいなものを設けました。

  • 生産者は500ミリ秒毎に一つ生産する
  • 消費者は2秒毎に消費処理を実施する
  • 消費者に一度の消費処理で与えられている猶予時間は300ミリ秒とする
  • 消費者は一つのアイテムを消費するのに100ミリ秒かかる
  • 回収者は消費者が消費し切れなかった残りのアイテムを全て回収する
  • 回収者は一つ回収するのに1秒かかる

サンプル

package async

import (
    "log"
    "time"
)

func init() {
    log.SetFlags(log.Flags() &^ log.LstdFlags)
}

// Main, Producer, Consumer, RemainCollector が利用するデータ
type (
    item  int
    empty struct{}
)

// Main, Producer, Consumer, RemainCollector の間で利用されるチャネル達
type (
    done      <-chan empty
    itemCh    <-chan item
    terminate <-chan empty
)

// 各役割ごとに決められたインターバル
var (
    producerInterval     = 500 * time.Millisecond
    consumerInterval     = 2 * time.Second
    consumerProcInterval = 300 * time.Millisecond
    consumerWaitInterval = 100 * time.Millisecond
    remainerProcInterval = 1 * time.Second
)

// ProducerConsumer は、ゴルーチンとチャネルを使って 生産者/消費者 処理を実施するサンプルです
func ProducerConsumer() error {
    var (
        // 処理終了を指示するチャネル
        done = make(chan empty)
    )

    // 生産者 生成
    itemCh, termProducer := makeProducer(done)
    // 消費者 生成
    termConsumer := makeConsumer(done, itemCh)
    // 残り物収集班 生成
    termRemainCollect := makeRemainCollector(done, termConsumer, itemCh)

    select {
    case <-time.After(10 * time.Second):
        // 処理終了
        close(done)
    }

    // 生産者, 消費者, 残り物収集班 が終了するのを待つ
    <-termProducer
    <-termConsumer
    <-termRemainCollect

    log.Println("MAIN END")

    return nil
}

// makeProducer は、生産者処理を担当する関数です.
// 内部で、ゴルーチンを起動し以下のスケジューリングで処理を行います。
//
//   - producerInterval毎に一つ生産する
//
// 戻り値として、生産したアイテムを受け取ることができるチャネルと
// 自身の処理が終了したことを知らせるチャネルを返します。
func makeProducer(done done) (itemCh, terminate) {
    ch := make(chan item, 100)
    termCh := make(chan empty)

    go func() {
        defer close(termCh)
        defer close(ch)

        i := 0

        for {
            select {
            case <-done:
                return
            case <-time.After(producerInterval):
                i++
                log.Printf("[生産者] 生成 %d\n", i)
                ch <- item(i)
            }
        }
    }()

    return ch, termCh
}

// makeConsumer は、消費者処理を担当する関数です.
// 内部で、ゴルーチンを起動し以下のスケジューリングで処理を行います。
//
//   - consumerInterval毎に消費処理を実施する
//   - 一度の消費処理で与えられている猶予時間はconsumerProcIntervalとする
//   - 一つのアイテムを消費するのにconsumerWaitIntervalかかる
//
// 戻り値として
// 自身の処理が終了したことを知らせるチャネルを返します。
func makeConsumer(done done, itemCh itemCh) terminate {
    termCh := make(chan empty)

    go func() {
        defer close(termCh)

        for {
            select {
            case <-done:
                return
            case <-time.After(consumerInterval):
                // 消費出来るタイミングが訪れたため活動を開始するが
                // 処理を実施するのは与えられた猶予時間分だけ動くようにする
                timeout := time.After(consumerProcInterval)

            L:
                for {
                    select {
                    case <-timeout:
                        // 消費猶予時間が終わったので今回分のターンは終わり
                        break L
                    case v, ok := <-itemCh:
                        if !ok {
                            break L
                        }
                        log.Printf("[消費者] 消費 %v\n", v)
                        time.Sleep(consumerWaitInterval)
                    }
                }
            }
        }
    }()

    return termCh
}

// makeRemainCollector は、残り物回収処理を担当する関数です.
// 生産者が生産した分を消費者が消費しきれなかった場合に本処理が残りを回収します。
// 内部で、ゴルーチンを起動し以下のスケジューリングで処理を行います。
//
//   - 管理者(今はmain処理)が処理終了を告げていること
//   - 消費者が処理終了を告げていること
//   - 生産者が生産した残りのアイテムを全て回収する
//   - アイテム一つの回収でremainerProcIntervalかかる
//
// 戻り値として
// 自身の処理が終了したことを知らせるチャネルを返します。
func makeRemainCollector(done done, termConsumer terminate, itemCh itemCh) terminate {
    termCh := make(chan empty)

    go func() {
        defer close(termCh)

        // 自身は残りモノを回収する役割なので
        // 生産終了と消費終了が告げられた後に
        // 動きを開始する
        <-done
        <-termConsumer

        for {
            select {
            case v, ok := <-itemCh:
                if !ok {
                    return
                }
                log.Printf("[残り回収班] 回収 %v\n", v)
                time.Sleep(remainerProcInterval)
            }
        }
    }()

    return termCh
}

try-golang/producer_consumer.go at master · devlights/try-golang · GitHub

実行すると以下のようになります。

ENTER EXAMPLE NAME: async_producer_consumer
[Name] "async_producer_consumer"
[生産者] 生成 1
[生産者] 生成 2
[生産者] 生成 3
[消費者] 消費 1
[生産者] 生成 4
[消費者] 消費 2
[消費者] 消費 3
[生産者] 生成 5
[生産者] 生成 6
[生産者] 生成 7
[生産者] 生成 8
[消費者] 消費 4
[消費者] 消費 5
[消費者] 消費 6
[生産者] 生成 9
[消費者] 消費 7
[消費者] 消費 8
[生産者] 生成 10
[生産者] 生成 11
[生産者] 生成 12
[生産者] 生成 13
[消費者] 消費 9
[消費者] 消費 10
[生産者] 生成 14
[消費者] 消費 11
[消費者] 消費 12
[消費者] 消費 13
[生産者] 生成 15
[生産者] 生成 16
[生産者] 生成 17
[生産者] 生成 18
[消費者] 消費 14
[消費者] 消費 15
[生産者] 生成 19
[消費者] 消費 16
[残り回収班] 回収 17
[残り回収班] 回収 18
[残り回収班] 回収 19
MAIN END

わざと消費者のスピードを抑えているので、結果としてちょっと消費しきれず残ったものを

回収班がちゃんと回収していますね。

ちゃんとした処理として成立させるには、このサンプルはまだ足りてなくて 管理者処理の最後で、各ゴルーチンの終わりを待っている部分で永久に待っています。 実務で処理書く場合は、ここでタイムリミットを設けて、リミットまでに処理を終了しなかったものを何らかのエラーレポートに報告したりするようにしないといけませんが、今回は面倒なので、そこの部分は割愛しています。また、各ゴルーチン内でエラーが発生した場合の処理も割愛しています。


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com

github.com