いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

Goメモ-463 (golang.org/x/sys/unixを使ってソケット通信)(03-ノンブロッキング)

関連記事

Goメモ-461 (golang.org/x/sys/unixを使ってソケット通信)(01-基本パターン) - いろいろ備忘録日記

Goメモ-462 (golang.org/x/sys/unixを使ってソケット通信)(02-正規解放(Graceful Shutdown, Orderly Release)) - いろいろ備忘録日記

GitHub - devlights/blog-summary: ブログ「いろいろ備忘録日記」のまとめ

概要

以下、自分用のメモです。忘れないうちにメモメモ。。。

最近、ひょんなことで、golang.org/x/sys/unix を使ってソケット通信する処理をちょっと書いたので、ついでにここにメモ残しておこうと思いました。

syscall パッケージを使っても同じことが出来るのですが、少し呼び出すAPIが異なるかもしれません。

今回は、ノンブロッキングソケットのサンプル。

サンプル

サーバとクライアントを用意します。サンプルなので、1度のリクエスト・レスポンスで終わりです。

データをやり取りしている間はノンブロッキングですが、最後の正規解放するフェーズはブロッキングに戻して処理しています。

Client

//go:build linux

package main

import (
    "errors"
    "log"
    "net"
    "time"

    "golang.org/x/sys/unix"
)

func init() {
    log.SetFlags(log.Lmicroseconds)
}

func main() {
    if err := run(); err != nil {
        log.Fatal(err)
    }
}

func run() error {
    var (
        sfd int
        err error
    )

    sfd, err = unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
    if err != nil {
        return err
    }
    defer func() {
        log.Println("[CLIENT] ソケットクローズ")
        unix.Close(sfd)
    }()

    var (
        ip   = net.ParseIP("127.0.0.1")
        ipv4 [4]byte

        sAddr unix.Sockaddr
    )
    copy(ipv4[:], ip.To4())

    sAddr = &unix.SockaddrInet4{Port: 8888, Addr: ipv4}
    err = unix.Connect(sfd, sAddr)
    if err != nil {
        return err
    }

    log.Println("[CLIENT] Connect")

    //
    // ソケットをノンブロッキングモードに設定
    // クライアントソケットの場合は必ず「接続した後」に設定する必要がある.
    // (接続する前にノンブロッキングモード設定しても、ソケットが接続されていないため効果がない)
    //
    err = unix.SetNonblock(sfd, true)
    if err != nil {
        return err
    }
    log.Println("[CLIENT] set O_NONBLOCK")

    //
    // Send
    //
    var (
        buf = make([]byte, 2048)
        msg = "helloworld"
    )
    for {
        copy(buf, []byte(msg))

        err = unix.Send(sfd, buf[:len(msg)], 0)
        if err != nil {
            // 基本的に大抵のOSでは EAGAIN と EWOULDBLOCK は同じコードを示す (0xb) ので
            // EAGAINのみを見ていれば良いが、man send(2)の記載では EAGAIN または EWOULDBLOCK を返すと
            // 記載があるため、両方見ておくのが無難。
            //
            // send(2)
            //   - https://ja.manpages.org/send/2
            switch {
            case errors.Is(err, unix.EAGAIN):
                log.Println("[CLIENT][SEND] --> unix.EAGAIN")
            case errors.Is(err, unix.EWOULDBLOCK):
                log.Println("[CLIENT][SEND] --> unix.EWOULDBLOCK")
            case errors.Is(err, unix.EINTR):
                log.Println("[CLIENT][SEND] --> unix.EINTR")
            default:
                return err
            }

            time.Sleep(100 * time.Millisecond)
            continue
        }

        log.Printf("[CLIENT] SEND %s", msg)

        break
    }

    //
    // Recv
    //
    var (
        n int
    )
    for {
        clear(buf)

        n, err = unix.Read(sfd, buf)
        if err != nil {
            // 基本的に大抵のOSでは EAGAIN と EWOULDBLOCK は同じコードを示す (0xb) ので
            // EAGAINのみを見ていれば良いが、man send(2)の記載では EAGAIN または EWOULDBLOCK を返すと
            // 記載があるため、両方見ておくのが無難。
            //
            // read(2)
            //   - https://ja.manpages.org/read/2
            switch {
            case errors.Is(err, unix.EAGAIN):
                log.Println("[CLIENT][RECV] --> unix.EAGAIN")
            case errors.Is(err, unix.EWOULDBLOCK):
                log.Println("[CLIENT][RECV] --> unix.EWOULDBLOCK")
            case errors.Is(err, unix.EINTR):
                log.Println("[CLIENT][RECV] --> unix.EINTR")
            default:
                return err
            }

            time.Sleep(50 * time.Millisecond)
            continue
        }

        log.Printf("[CLIENT] RECV %s", buf[:n])

        break
    }

    // ブロッキングモードに戻す
    err = unix.SetNonblock(sfd, false)
    if err != nil {
        return err
    }
    log.Println("[CLIENT] reset O_NONBLOCK")

    //
    // 正規解放 (Graceful Shutdown or Orderly Release)
    //
    // ソケットの正規解放とは、ソケット通信を適切に終了させ、リソースを解放するプロセスのことを指します。
    // これには通常、shutdownとcloseの2つの操作が含まれます。
    //
    // 1. Shutdown
    //   shutdownは通信相手に対して接続終了の意思を伝えます。
    //   例えば、SHUT_WRを使用すると、相手側にEOF(End of File)を送信します。
    //
    // 2. close
    //   closeはソケットのファイルディスクリプタを閉じ、関連するリソースを解放します。
    //   最後の参照が閉じられたときにのみ、ネットワークの端点を完全に解放します。
    //
    // 正規解放の手順
    //   1. shutdown(SHUT_WR) の呼び出し。これにより相手に送信停止を通知する。
    //   2. 必要に応じて、残りのデータを受信する。
    //   3. 最後に close を呼び出して、ソケットのリソースを完全に解放する。
    //
    // 正規解放を行うことで、ネットワーク通信を適切に終了し、リソースを効率的に管理することができます。
    // 特に信頼性の高い通信が必要な場合や、大規模なシステムでリソース管理が重要な場合に、この方法は有効です。
    //

    // 1. shutdown(SHUT_WR) の呼び出し。これにより相手に送信停止を通知する。
    //    つまり、相手側にEOFが送信される。「もうデータは送りません」という意思表示。
    err = unix.Shutdown(sfd, unix.SHUT_WR)
    if err != nil {
        return err
    }

    log.Println("[CLIENT] shutdown(SHUT_WR)")

    // 2. 必要に応じて、残りのデータを受信する。
LOOP:
    for {
        clear(buf)

        n, err = unix.Read(sfd, buf)
        switch {
        case n == 0:
            log.Println("[CLIENT] 切断検知 (0 byte read)")
            break LOOP
        case err != nil:
            var sysErr unix.Errno
            if errors.As(err, &sysErr); sysErr == unix.ECONNRESET {
                log.Printf("[CLIENT] 切断検知 (%s)", sysErr)
                break LOOP
            }

            return err
        default:
            log.Printf("[CLIENT] RECV REMAIN [%s]", buf[:n])
        }
    }

    // 3. 最後に close を呼び出して、ソケットのリソースを完全に解放する。
    // これは上の defer で行われている。

    return nil
}

Server

//go:build linux

package main

import (
    "errors"
    "log"
    "net"
    "time"

    "golang.org/x/sys/unix"
)

func init() {
    log.SetFlags(log.Lmicroseconds)
}

func main() {
    if err := run(); err != nil {
        log.Fatal(err)
    }
}

func run() error {
    //
    // Create
    //
    var (
        sfd int
        err error
    )

    sfd, err = unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
    if err != nil {
        return err
    }
    defer func() {
        log.Println("[SERVER] サーバーソケットクローズ")
        unix.Close(sfd)
    }()

    //
    // SO_REUSEADDR
    //
    err = unix.SetsockoptInt(sfd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
    if err != nil {
        return err
    }
    log.Println("[SERVER] set SO_REUSEADDR")

    //
    // Accept処理をノンブロッキングモードで処理するために設定
    //
    err = unix.SetNonblock(sfd, true)
    if err != nil {
        return err
    }
    log.Println("[SERVER] set O_NONBLOCK")

    //
    // Bind and Listen
    //
    var (
        ip   = net.ParseIP("127.0.0.1")
        ipv4 [4]byte

        sAddr   unix.Sockaddr
        backLog = 2
    )
    copy(ipv4[:], ip.To4())

    sAddr = &unix.SockaddrInet4{Port: 8888, Addr: ipv4}
    err = unix.Bind(sfd, sAddr)
    if err != nil {
        return err
    }

    err = unix.Listen(sfd, backLog)
    if err != nil {
        return err
    }

    //
    // Accept
    //   ノンブロッキングモードにしているので
    //   接続するまでブロックされずに unix.EAGAIN が返る.
    //
    var (
        cfd   int
        cAddr unix.Sockaddr
    )

    for {
        cfd, cAddr, err = unix.Accept(sfd)
        if err != nil {
            switch {
            case errors.Is(err, unix.EAGAIN):
                log.Println("[SERVER][ACCEPT] --> unix.EAGAIN")
            case errors.Is(err, unix.EWOULDBLOCK):
                log.Println("[SERVER][ACCEPT] --> unix.EWOULDBLOCK")
            case errors.Is(err, unix.EINTR):
                log.Println("[SERVER][ACCEPT] --> unix.EINTR")
            default:
                return err
            }

            time.Sleep(100 * time.Millisecond)
            continue
        }

        break
    }

    defer func() {
        log.Println("[SERVER] パケット送受信用ソケットクローズ")
        unix.Close(cfd)
    }()

    cAddrInet4 := cAddr.(*unix.SockaddrInet4)
    log.Printf("[SERVER] Connect from %v:%v", cAddrInet4.Addr, cAddrInet4.Port)

    //
    // (補足)
    //    サーバソケットとAcceptで受け取ったパケット送受信用ソケットは別物なので
    //    受信と送信をノンブロッキングしたい場合は、再度 unix.SetNonblock(cfd, true) が必要.
    //
    // 本サンプルではブロッキングモードのままで処理している
    //

    //
    // Recv
    //
    var (
        buf = make([]byte, 2048)
        n   int
    )

    n, err = unix.Read(cfd, buf)
    if err != nil {
        return err
    }
    log.Printf("[SERVER] RECV %s", string(buf[:n]))

    // クライアントから受信した値を使って「何かの処理」を行った後に
    // クライアント側に返送するという流れをシミュレートするために
    // 意図的に少しディレイを入れる
    time.Sleep(150 * time.Millisecond)

    //
    // Send
    //
    var (
        msg = "HELLOWORLD "
    )

    for range 5 {
        clear(buf)
        copy(buf, []byte(msg))

        _, err = unix.Write(cfd, buf[:len(msg)])
        if err != nil {
            return err
        }
        log.Printf("[SERVER] SEND %s", buf[:len(msg)])
    }

    // 1. shutdown(SHUT_WR) の呼び出し。これにより相手に送信停止を通知する。
    err = unix.Shutdown(cfd, unix.SHUT_WR)
    if err != nil {
        return err
    }

    log.Println("[SERVER] shutdown(SHUT_WR)")

    // 2. 必要に応じて、残りのデータを受信する。
LOOP:
    for {
        clear(buf)

        n, err = unix.Read(cfd, buf)
        switch {
        case n == 0:
            log.Println("[SERVER] 切断検知 (0 byte read)")
            break LOOP
        case err != nil:
            if errors.Is(err, unix.ECONNRESET) {
                log.Printf("[SERVER] 切断検知 (%s)", err)
                break LOOP
            }

            return err
        default:
            log.Printf("[SERVER] RECV %s", buf[:n])
        }
    }

    // 3. 最後に close を呼び出して、ソケットのリソースを完全に解放する。
    // これは上の defer で行われている。

    return nil
}

Taskfile

実行用に以下のようなタスクファイルを用意

# https://taskfile.dev

version: '3'

tasks:
  default:
    cmds:
      - task: run
  fmt:
    cmds:
      - goimports -w .
  prepare:
    cmds:
      - mkdir -p bin
  build:
    deps: [ fmt ]
    cmds:
      - go build -o bin/server server/server.go
      - go build -o bin/client client/client.go
  run:
    deps: [ build ]
    cmds:
      - ./bin/server &
      - sleep 1
      - ./bin/client
      - sleep 1
      - pgrep server && pkill server
    ignore_error: true
  clean:
    cmds:
      - rm -rf ./bin

実行

task: [fmt] goimports -w .
task: [build] go build -o bin/server server/server.go
task: [build] go build -o bin/client client/client.go
task: [run] ./bin/server &
task: [run] sleep 1
17:29:31.274018 [SERVER] set SO_REUSEADDR
17:29:31.274192 [SERVER] set O_NONBLOCK
17:29:31.274462 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.374781 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.475205 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.575536 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.675862 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.776187 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.876485 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:31.976835 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:32.077198 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:32.177530 [SERVER][ACCEPT] --> unix.EAGAIN
task: [run] ./bin/client
17:29:32.277740 [SERVER][ACCEPT] --> unix.EAGAIN
17:29:32.278312 [CLIENT] Connect
17:29:32.278503 [CLIENT] set O_NONBLOCK
17:29:32.278544 [CLIENT] SEND helloworld
17:29:32.278556 [CLIENT][RECV] --> unix.EAGAIN
17:29:32.328793 [CLIENT][RECV] --> unix.EAGAIN
17:29:32.378117 [SERVER] Connect from [127 0 0 1]:35250
17:29:32.378192 [SERVER] RECV helloworld
17:29:32.379053 [CLIENT][RECV] --> unix.EAGAIN
17:29:32.429330 [CLIENT][RECV] --> unix.EAGAIN
17:29:32.479578 [CLIENT][RECV] --> unix.EAGAIN
17:29:32.528568 [SERVER] SEND HELLOWORLD 
17:29:32.528648 [SERVER] SEND HELLOWORLD 
17:29:32.528670 [SERVER] SEND HELLOWORLD 
17:29:32.528687 [SERVER] SEND HELLOWORLD 
17:29:32.528705 [SERVER] SEND HELLOWORLD 
17:29:32.528721 [SERVER] shutdown(SHUT_WR)
17:29:32.529879 [CLIENT] RECV HELLOWORLD HELLOWORLD HELLOWORLD HELLOWORLD HELLOWORLD 
17:29:32.529904 [CLIENT] reset O_NONBLOCK
17:29:32.529964 [CLIENT] shutdown(SHUT_WR)
17:29:32.529983 [SERVER] 切断検知 (0 byte read)
17:29:32.530010 [SERVER] パケット送受信用ソケットクローズ
17:29:32.529984 [CLIENT] 切断検知 (0 byte read)
17:29:32.530022 [CLIENT] ソケットクローズ
17:29:32.530037 [SERVER] サーバーソケットクローズ
task: [run] sleep 1
task: [run] pgrep server && pkill server

参考情報

Goのおすすめ書籍


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。