いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

Goメモ-505 (sync.Condのメモ)(Producer-Consumer-Watcher)

関連記事

GitHub - devlights/blog-summary: ブログ「いろいろ備忘録日記」のまとめ

概要

以下、自分用のメモです。たまに sync.Cond 使うときに、使い方とかよく忘れるので。。。

基本的に一度だけのランデブーポイントを実装する場合はチャネルで事足りるのですが、何回も繰り返しポイントを用意する際には sync.Cond が便利です。

以下、サンプルです。

サンプル

main.go

データを生産する役割、データを消費する役割、それらを監視して働いていないときに叩き起こす役割の3つを sync.Cond を使って連携させています。

package main

import (
    "flag"
    "log"
    "math/rand/v2"
    "os"
    "os/signal"
    "sync"
    "time"
)

const (
    MAX_ITEM_COUNT = 10
    MAX_PRODUCERS  = 2
    MAX_CONSUMERS  = MAX_ITEM_COUNT / 2
    WATCH_INTERVAL = 1 * time.Second
)

type (
    Args struct {
        debug bool
    }
)

var (
    args Args
)

func init() {
    flag.BoolVar(&args.debug, "debug", false, "debug mode")
}

func main() {
    //
    // Producer-Consumer-Watcher のサンプル
    // 各非同期処理のランデブーポイントの制御に
    // *sync.Cond を利用。
    //

    log.SetFlags(log.Lmicroseconds)
    log.SetOutput(os.Stdout)

    flag.Parse()

    if err := run(); err != nil {
        log.Fatal(err)
    }
}

func run() error {
    var (
        ch   = make(chan int, MAX_ITEM_COUNT*MAX_PRODUCERS)
        sig  = make(chan os.Signal, 1)
        done = make(chan struct{})
    )
    defer close(ch)

    signal.Notify(sig, os.Interrupt)
    go func() {
        <-sig
        log.Printf("<<Interrupt>>")
        close(done)
    }()

    var (
        producer = sync.NewCond(&sync.Mutex{})
        consumer = sync.NewCond(&sync.Mutex{})
    )

    // 役割: 生産者と消費者を監視し、必要であれば叩き起こす
    go watch(1, ch, producer, consumer)

    // 役割: 生産を行う
    for i := range MAX_PRODUCERS {
        go produce(i+1, ch, (i+1)*10000, producer, consumer)
    }

    // 役割: 消費を行う
    for i := range MAX_CONSUMERS {
        go consume(i+1, ch, consumer)
    }

    // ゴルーチンの終了待機などについては割愛
    <-done
    log.Printf("<<DONE>>")

    return nil
}

func watch(id int, ch <-chan int, producer, consumer *sync.Cond) {
    for {
        func() {
            producer.L.Lock()
            defer producer.L.Unlock()

            if len(ch) == 0 {
                producer.Broadcast()
                log.Printf("[W][%02d] >>> 0個です。生産しなさい。", id)
            }
        }()

        <-time.After(WATCH_INTERVAL)

        func() {
            consumer.L.Lock()
            defer consumer.L.Unlock()

            if len(ch) != 0 {
                consumer.Broadcast()
                log.Printf("[W][%02d] >>> 生産されています。消費しなさい。", id)
            }
        }()
    }
}

func produce(id int, ch chan<- int, start int, producer, consumer *sync.Cond) {
    var (
        count int
    )
    for i := start; ; {
        func() {
            producer.L.Lock()
            defer producer.L.Unlock()

            for len(ch) > cap(ch)/2 {
                dbg("[P][%02d] <<< 消費されるまで待機します。(残:%d)", id, len(ch))
                producer.Wait()
            }
        }()

        func() {
            consumer.L.Lock()
            defer consumer.L.Unlock()

            count = rand.IntN(MAX_ITEM_COUNT)
            for c := range count {
                ch <- i + (c + 1)
            }

            log.Printf("[P][%02d] >>> %d個生産しました。(残:%d)", id, count, len(ch))
            consumer.Broadcast()

            i += count
        }()

        // 次のタスク着手まで少し休憩
        <-time.After(time.Duration(rand.IntN(500)) * time.Millisecond)
    }
}

func consume(id int, ch <-chan int, consumer *sync.Cond) {
    for {
        func() {
            consumer.L.Lock()
            defer consumer.L.Unlock()

            for len(ch) == 0 {
                dbg("[C][%02d] <<< 生産されるまで待機します。(残:%d)", id, len(ch))
                consumer.Wait()
            }

            log.Printf("[C][%02d] >>> 消費しました (%v)(残:%d)", id, <-ch, len(ch))
        }()

        // 次のタスク着手まで少し休憩
        <-time.After(time.Duration(rand.IntN(1000)) * time.Millisecond)
    }
}

func dbg(format string, v ...any) {
    if args.debug {
        log.Printf(format, v...)
    }
}

Taskfile.yml

# https://taskfile.dev

version: '3'

tasks:
  default:
    cmds:
      - go run . -debug=false
    ignore_error: true

実行

実行すると、例えば以下のようになります。(実行するたびに結果は異なります。)

$ task
task: [default] go run . -debug=false
09:45:14.397539 [P][02] >>> 8個生産しました。(残:8)
09:45:14.397744 [C][05] >>> 消費しました (20001)(残:7)
09:45:14.397803 [P][01] >>> 7個生産しました。(残:14)
09:45:14.397817 [C][02] >>> 消費しました (20002)(残:13)
09:45:14.397831 [C][01] >>> 消費しました (20003)(残:12)
09:45:14.397839 [C][03] >>> 消費しました (20004)(残:11)
09:45:14.397852 [C][04] >>> 消費しました (20005)(残:10)
09:45:14.409918 [P][02] >>> 0個生産しました。(残:10)
09:45:14.456148 [P][01] >>> 7個生産しました。(残:17)
09:45:14.578567 [C][03] >>> 消費しました (20006)(残:16)
09:45:14.592732 [C][04] >>> 消費しました (20007)(残:15)
09:45:14.606827 [C][05] >>> 消費しました (20008)(残:14)
09:45:14.627954 [C][01] >>> 消費しました (10001)(残:13)
09:45:14.736242 [C][05] >>> 消費しました (10002)(残:12)
09:45:14.947805 [C][02] >>> 消費しました (10003)(残:11)
09:45:15.046041 [C][01] >>> 消費しました (10004)(残:10)
09:45:15.079229 [C][03] >>> 消費しました (10005)(残:9)
09:45:15.175388 [C][05] >>> 消費しました (10006)(残:8)
09:45:15.347710 [C][01] >>> 消費しました (10007)(残:7)
09:45:15.397873 [W][01] >>> 生産されています。消費しなさい。
09:45:15.534500 [C][04] >>> 消費しました (10008)(残:6)
09:45:15.546723 [C][04] >>> 消費しました (10009)(残:5)
09:45:15.679974 [C][05] >>> 消費しました (10010)(残:4)
09:45:15.841338 [C][02] >>> 消費しました (10011)(残:3)
09:45:15.889525 [C][03] >>> 消費しました (10012)(残:2)
09:45:16.033950 [C][04] >>> 消費しました (10013)(残:1)
09:45:16.106198 [C][05] >>> 消費しました (10014)(残:0)
09:45:16.398170 [W][01] >>> 0個です。生産しなさい。
09:45:16.398205 [P][02] >>> 3個生産しました。(残:3)
09:45:16.398223 [C][03] >>> 消費しました (20009)(残:2)
09:45:16.398269 [P][01] >>> 3個生産しました。(残:5)
09:45:16.398287 [C][02] >>> 消費しました (20010)(残:4)
09:45:16.398302 [C][01] >>> 消費しました (20011)(残:3)
09:45:16.398312 [C][05] >>> 消費しました (10015)(残:2)
09:45:16.422446 [C][04] >>> 消費しました (10016)(残:1)
09:45:16.422785 [C][05] >>> 消費しました (10017)(残:0)
09:45:16.570336 [P][01] >>> 5個生産しました。(残:5)
09:45:16.570385 [C][02] >>> 消費しました (10018)(残:4)
09:45:16.582587 [P][01] >>> 4個生産しました。(残:8)
09:45:16.650777 [P][02] >>> 4個生産しました。(残:12)
09:45:16.713155 [C][02] >>> 消費しました (10019)(残:11)
09:45:16.805353 [C][01] >>> 消費しました (10020)(残:10)
09:45:16.836578 [P][02] >>> 3個生産しました。(残:13)
09:45:16.934218 [C][03] >>> 消費しました (10021)(残:12)
09:45:17.093537 [C][04] >>> 消費しました (10022)(残:11)
09:45:17.241811 [C][05] >>> 消費しました (10023)(残:10)
09:45:17.399157 [W][01] >>> 生産されています。消費しなさい。
09:45:17.514295 [C][03] >>> 消費しました (10024)(残:9)
09:45:17.572418 [C][01] >>> 消費しました (10025)(残:8)
09:45:17.697684 [C][02] >>> 消費しました (10026)(残:7)
09:45:17.807923 [C][05] >>> 消費しました (20012)(残:6)
09:45:18.091354 [C][04] >>> 消費しました (20013)(残:5)
09:45:18.152479 [C][01] >>> 消費しました (20014)(残:4)
09:45:18.251709 [C][03] >>> 消費しました (20015)(残:3)
09:45:18.265876 [C][01] >>> 消費しました (20016)(残:2)
09:45:18.351099 [C][02] >>> 消費しました (20017)(残:1)
09:45:18.400280 [W][01] >>> 生産されています。消費しなさい。
09:45:18.497510 [C][03] >>> 消費しました (20018)(残:0)
09:45:19.400993 [W][01] >>> 0個です。生産しなさい。
09:45:19.401022 [P][02] >>> 8個生産しました。(残:8)
09:45:19.401036 [P][01] >>> 0個生産しました。(残:8)
09:45:19.401060 [C][03] >>> 消費しました (20019)(残:7)
09:45:19.401071 [C][05] >>> 消費しました (20020)(残:6)
09:45:19.401094 [C][04] >>> 消費しました (20021)(残:5)
09:45:19.401108 [C][01] >>> 消費しました (20022)(残:4)
09:45:19.401121 [C][02] >>> 消費しました (20023)(残:3)
09:45:19.514361 [C][04] >>> 消費しました (20024)(残:2)
09:45:19.534457 [P][01] >>> 7個生産しました。(残:9)
09:45:19.730242 [C][01] >>> 消費しました (20025)(残:8)
09:45:19.734438 [P][02] >>> 4個生産しました。(残:12)
09:45:19.891123 [C][05] >>> 消費しました (20026)(残:11)
09:45:19.974341 [C][03] >>> 消費しました (10027)(残:10)
09:45:20.165467 [C][04] >>> 消費しました (10028)(残:9)
09:45:20.196681 [C][02] >>> 消費しました (10029)(残:8)
09:45:20.386885 [C][01] >>> 消費しました (10030)(残:7)
09:45:20.402459 [W][01] >>> 生産されています。消費しなさい。
09:45:20.650229 [C][05] >>> 消費しました (10031)(残:6)
09:45:20.686436 [C][01] >>> 消費しました (10032)(残:5)
09:45:20.735690 [C][03] >>> 消費しました (10033)(残:4)
09:45:20.851986 [C][01] >>> 消費しました (20027)(残:3)
09:45:20.864154 [C][02] >>> 消費しました (20028)(残:2)
09:45:21.088516 [C][03] >>> 消費しました (20029)(残:1)
09:45:21.144719 [C][04] >>> 消費しました (20030)(残:0)
09:45:21.403257 [W][01] >>> 0個です。生産しなさい。
09:45:21.403294 [P][02] >>> 1個生産しました。(残:1)
09:45:21.403311 [C][02] >>> 消費しました (20031)(残:0)
09:45:21.403326 [P][01] >>> 5個生産しました。(残:5)
09:45:21.513590 [P][01] >>> 0個生産しました。(残:5)
09:45:21.524788 [C][01] >>> 消費しました (10034)(残:4)
09:45:21.526249 [C][01] >>> 消費しました (10035)(残:3)
09:45:21.564602 [C][05] >>> 消費しました (10036)(残:2)
09:45:21.644747 [P][01] >>> 1個生産しました。(残:3)
09:45:21.668940 [P][02] >>> 4個生産しました。(残:7)
09:45:21.773198 [P][01] >>> 9個生産しました。(残:16)
09:45:21.829339 [C][01] >>> 消費しました (10037)(残:15)
09:45:22.034972 [C][03] >>> 消費しました (10038)(残:14)
09:45:22.086167 [C][04] >>> 消費しました (10039)(残:13)
09:45:22.147829 [C][02] >>> 消費しました (20032)(残:12)
09:45:22.296113 [C][01] >>> 消費しました (20033)(残:11)
09:45:22.312208 [C][04] >>> 消費しました (20034)(残:10)
09:45:22.350416 [C][05] >>> 消費しました (20035)(残:9)
09:45:22.403627 [W][01] >>> 生産されています。消費しなさい。
09:45:22.538209 [C][05] >>> 消費しました (10040)(残:8)
09:45:22.910731 [C][04] >>> 消費しました (10041)(残:7)
09:45:22.922483 [C][03] >>> 消費しました (10042)(残:6)
09:45:23.008894 [C][05] >>> 消費しました (10043)(残:5)
09:45:23.030455 [C][02] >>> 消費しました (10044)(残:4)
09:45:23.106724 [C][01] >>> 消費しました (10045)(残:3)
09:45:23.138976 [C][01] >>> 消費しました (10046)(残:2)
09:45:23.237293 [C][05] >>> 消費しました (10047)(残:1)
09:45:23.249501 [C][03] >>> 消費しました (10048)(残:0)
09:45:23.404371 [W][01] >>> 0個です。生産しなさい。
09:45:23.404408 [P][01] >>> 6個生産しました。(残:6)
09:45:23.404429 [C][02] >>> 消費しました (10049)(残:5)
09:45:23.404454 [C][03] >>> 消費しました (10050)(残:4)
09:45:23.404464 [C][04] >>> 消費しました (10051)(残:3)
09:45:23.404476 [C][05] >>> 消費しました (10052)(残:2)
09:45:23.404482 [P][02] >>> 8個生産しました。(残:10)
09:45:23.743011 [P][01] >>> 1個生産しました。(残:11)
09:45:23.767384 [C][02] >>> 消費しました (10053)(残:10)
09:45:23.813520 [C][01] >>> 消費しました (10054)(残:9)
09:45:23.941829 [C][05] >>> 消費しました (20036)(残:8)
09:45:23.949129 [P][01] >>> 8個生産しました。(残:16)
09:45:24.045360 [C][05] >>> 消費しました (20037)(残:15)
09:45:24.176581 [C][01] >>> 消費しました (20038)(残:14)
09:45:24.178758 [C][03] >>> 消費しました (20039)(残:13)
09:45:24.282132 [C][04] >>> 消費しました (20040)(残:12)
09:45:24.405397 [W][01] >>> 生産されています。消費しなさい。
09:45:24.544682 [C][05] >>> 消費しました (20041)(残:11)
^C09:45:24.655624 <<Interrupt>>
09:45:24.655692 <<DONE>>

参考情報

pkg.go.dev

mattn.kaoriya.net

lestrrat.medium.com

tech.yappli.io

victoriametrics.com

Goのおすすめ書籍


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。