Learning Go Chapter 10 Concurrency in Go

Concurrency in Go

はじめに

  • Go の並行処理モデルは他の言語と少し違う
  • CSP (Communicating Sequential Process) という理論に基づいた並行処理モデル

並行処理いつ使うのか

  • 並行処理を行うと必ず実行速度が速くなると誤認している人が多いが実際は違う

    • 並行処理は並列処理ではない
  • 並行処理を使うかどうかは、どのようにデータが流れるか次第

    • 独立して操作可能な複数の操作からデータを取得して結合する場合
  • 時間のかからない処理を並行処理で実行してもあまり価値はない

    • I/O の処理が並行処理で実行されることが多い
    • いつ並行処理を使えばよいかわからない場合にはまずシーケンシャルな処理を書いて、そのあとベンチマークを取れば良い

Goroutine

  • Go の並行処理におけるコアコンセプト
  • Goroutine は Go ランタイムによって管理される軽量なプロセス
  • Goプログラムが起動すると、Goランタイムは多数のスレッドを作成し、単一のゴルーチンを起動してプログラムを実行

    • OS がCPUコア全体でスレッドをスケジュールするのと同じように、プログラムによって作成されたすべてのゴルーチンは、最初のゴルーチンを含め、Goランタイムスケジューラによって自動的にこれらのスレッドに割り当てられる
  • スレッドと比較した Goroutine のメリット

    • 作成が速い (システムリソースを消費しないから)
    • スタックサイズが小さい (し、必要に応じて拡張できる)
    • スイッチコストが小さい
    • Go プロセスに最適化されている
  • Goroutineは、関数呼び出しの前にgoキーワードを配置することによって起動

    • 関数によって返される値はすべて無視される
  • Goroutineの例

func process(val int) int {
    return 1
}

func runThingConcurrently(in <-chan int, out chan<- int) {
    go func() {
        for val := range in {
            result := process(val)
            out <- result
        }
    }()
}

Channel

  • Goroutine はチャネルを使って通信する
  • Slice や Map と同じように makeを使って作成できるビルトインタイプ
    • Slice や Map と同じように チャネルは参照型。関数に渡すとポインターの値がコピーされる
  • zero valuenil
ch := make(chan int)

Reading, Writing, and Buffering

  • <- オペレータをつかって チャネルと通信する
a := <- ch // ch チャネルから値を読み取る
ch <- b     // ch チャネルに値bを書き込む
  • チャネルに書き込まれた値は一度しか読み出されない

    • 複数の goroutine がチャネルを読み出す場合、どちらか一方しかチャネルの値を読み出すことができない
  • チャネルの型宣言の仕方で読み取るのか、書き出すのかを示す

    • in <-chan int このように <- の後に chan がある場合は読み取り
    • out chan<-int このように chan の後に<- がある場合は書き出し
  • デフォルトでは、チャネルはバッファリングされていない

  • バッファリングされていない開いているチャネルに書き込むたびに、別のゴルーチンが同じチャネルから読み取るまで、書き込みゴルーチンが一時停止される
  • 同様に、開いているバッファなしチャネルからの読み取りにより、別のゴルーチンが同じチャネルに書き込むまで、読み取りゴルーチンが一時停止される
  • => 少なくとも2つの同時に実行されているゴルーチンがないと、バッファリングされていないチャネルに書き込みまたは読み取りを行うことができないことを意味

  • バッファリングされたチャネルは、チャネルの作成時にバッファの容量を指定することによって作成される

    • バッファの capacity を変更することはできない
ch2 := make(chan int, 10)
  • たいていの場合はバッファリングされていないチャネルで十分

for-range とチャネル

  • ループは、チャネルが閉じられるまで、またはbreakまたはreturnステートメントに到達するまで続く
for v := range ch {
        fmt.Println(v)
}

チャネルを閉じる

  • チャネルを閉じる場合、ビルトインファンクションの close を使う
close(ch)
  • チャネルが閉じられると、チャネルへの書き込みまたはチャネルを再び閉じようとすると、パニックになる

    • ただし、閉じたチャネルからの読み取りの試行は常に成功する...
  • チャネルが閉じられているかどうかを検出するために ok カンマイディオムを使用する

v, ok := <-ch
  • チャネルを閉じる責任は、チャネルに書き込むゴルーチンにある
  • チャネルを閉じる必要があるのは、チャネルが閉じるのを待機しているゴルーチンがある場合のみ

    • チャネルはただの変数なので、使用されなくなったら GC される
  • チャネルはコードを一連の段階として考え、データの依存関係を明確にすることをガイドする

    • 他の言語の場合、変更可能なグローバルな共有状態を使うため、データがプログラムをどのように流れるかを理解することが困難、スレッドの独立性を判断するのも困難になる

チャネルの振る舞い

バッファされていないチャネル OPEN バッファされていないチャネル CLOSE バッファされているチャネル OPEN バッファされているチャネル CLOSE nil
Read 書き込みがあるまで待機 zero バリューを返す バッファが空なら待機 バッファに値が残っているならその値を返す。空なら zero バリューを返す 永久にハングする
Write 読み込みがあるまで待機 Panic! バッファがいっぱいなら待機 Panic! 永久にハングする
Close 正常に機能する Panic! 正常に機能する、値はバッファリングされたまま Panic! Panic!

select

  • select 文は Go の並行性モデルにおけるチャネル以外の主たる構成要素
    • selectキーワードを使用すると、ゴルーチンが複数のチャネルのセットの1つから読み取りまたは書き込みを行うことができる
select {
case v := <-ch:
    fmt.Println(v)
case v := <-ch2:
    fmt.Println(v)
case ch3 <- x:
    fmt.Println("wrote", x)
case <-ch4:
    fmt.Println("got value on ch4, but ignored it")
}
  • 複数のケースに読み取りまたは書き込みが可能なチャネルがある場合はどうななるか?

    • 選択アルゴリズムは単純
    • 前進できるケースからランダムに選択。順序は重要ではない
    • これは、trueに解決される最初のケースを常に選択するswitchステートメントとは大きく異なる
    • また、他のケースよりも優先されるケースはなく、すべてが同時にチェックされる
    • ランダムに選択することのもう1つの利点は、デッドロックの最も一般的な原因の1つである、一貫性のない順序でロックを取得することを防ぐこと
  • selectは多数のチャネルを介した通信を担当するため、多くの場合、forループ内に埋め込まれている

   for {
        select {
        case <-done:
            return
        case v := <-ch:
            fmt.Println(v)
        }
    }
  • チャネルにノンブロッキングの読み取りまたは書き込みを実装する場合は、selectで default を使用する

Concurrency の実践とパターン

  • 並行性は API に公開しない
  • 並行性制御の仕組みは実装の詳細であるため、API として並行性制御の仕組みを公開すると、利用者が注意深く並行制御する責務を負ってしまう

  • ただし、並行性制御のためのヘルパー関数を提供するライブラリの場合は例外。一部の関数はチャネルを引数にとる関数を export している。time.After

Goroutines, for Loops, and Varying Variables

  • ゴルーチンを起動するために使用するクロージャにはパラメータがない
  • 代わりに、宣言された環境から値をキャプチャする
  • これが機能しない一般的な状況がforループのインデックスまたは値をキャプチャしようとしたとき

Always Clean Up Your Goroutines

  • ゴルーチン関数を起動するときはいつでも、それが最終的に終了することを確認する必要がある
  • 変数とは異なり、Goランタイムは、ゴルーチンが二度と使用されないことを検出できない
  • ゴルーチンが終了しない場合でも、スケジューラは定期的に何もしない時間を与え、プログラムの速度を低下させる

    • これは Goroutine リークと呼ぶ
  • ジェネレータの実装のために Goroutine 使うと、例えば使用者側が途中で break しただけで、Goroutine リークが起きる

The Done Channel Pattern

  • いくつかの関数を指定して最も速く結果を返した関数の値を返す関数
func searchData(s string, searchers []func(string) []string) []string {
    done := make(chan struct{})

    result := make(chan []string)

    for _, searcher := range searchers {
        go func(searcher func(string) []string) {
            select {
            case result <- searcher(s):
            case <-done:
            }
        }(searcher)
    }

    r := <-result
    close(done)
    return r
}

Using a Cancel Function to Terminate a Goroutine

  • Goroutine リークを防ぐために、Goroutine を起動する側で Groutine をクローズするための関数(クロージャー)を返す
func countTo(max int) (<-chan int, func()) {
    ch := make(chan int)
    done := make(chan struct{})
    cancel := func() {
        close(done)
    }

    go func() {
        for i := 0; i < max; i++ {
            select {
            case <-done:
                return
            case ch <- i:
            }
        }
        close(ch)
    }()

    return ch, cancel
}

func testCountTo() {
    ch, cancel := countTo(10)
    for i := range ch {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }

    cancel()
}

When to Use Buffered and Unbuffered Channels

  • バッファリングされたチャネルは、起動したゴルーチンの数がわかっている場合、起動するゴルーチンの数を制限したい場合、またはキューに入れられる作業の量を制限したい場合に役立つ

Backpressure

  • Backpressure の実装でバッファーチャネルが使用できる可能性がある
type PressureGauge struct {
    ch chan struct{}
}

func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)

    for i := 0; i < limit; i++ {
        ch <- struct{}{}
    }

    return &PressureGauge{
        ch: ch,
    }
}

// チャネルが使用できなかった場合(defaultの場合) エラーがかえる
func (pg *PressureGauge) Process(f func()) error {
    select {
    case <-pg.ch:
        f()
        pg.ch <- struct{}{}
        return nil
    default:
        return errors.New("no more capacity")
    }
}

Turning Off a case in a select

  • 複数の同時ソースからのデータを組み合わせる必要がある場合は、selectキーワードが最適
  • ただし、閉じたチャネルを適切に処理する必要がある
  • 選択のケースの1つが閉じたチャネルの読み取りである場合、それは常に成功し、ゼロ値を返す

    • そのケースを選択するたびに、値が有効であることを確認し、ケースをスキップする必要がある
    • 読み取りの間隔が空いていると、プログラムはジャンク値の読み取りに多くの時間を浪費する
  • nilチャネルを使用して、選択でケースを無効にすることができる

    • チャネルが閉じられたことを検出したら、チャネルの変数をnilに設定
    • nilチャネルからの読み取りが値を返さないため、関連するケースは実行されなくなる
for {
    select {
    case v, ok := <-in:
        if !ok {
            in = nil
            continue
        }
                         
        fmt.Println(v)
    case v, ok := <-in2:
        if !ok {
            in2 = nil
            continue
        }
        fmt.Println(v)
    case <-done:
        return
    }
                         
}

How to Time Out Code

func timeLimit() (int, error) {
    var result int
    var err error

    done := make(chan struct{})
    go func() {
        result, err = 1, nil
        close(done)
    }()
    select {
    case <-done:
        return result, err
    case <-time.After(2 * time.Second):
        return 0, errors.New("work timed out")
    }
}

Using WaitGroups

  • 1つのゴルーチンが、複数のゴルーチンが作業を完了するのを待つ必要がある場合がある
  • 単一のゴルーチンを待っている場合は、前に見た完了チャネルパターンを使用できる
  • ただし、複数のゴルーチンを待機している場合は、標準ライブラリの同期パッケージにあるWaitGroupを使用する必要がある
func testWaitGroup() {
    var wg sync.WaitGroup
    wg.Add(3)

    go func() {
               // wg を直接渡さない => コピーされて、デクリメントされないため
               // キャプチャして同一インスタンスであることを保証する
        defer wg.Done()
        // dothing1
    }()

    go func() {
        defer wg.Done()
        // dothing2
    }()

    go func() {
        defer wg.Done()
        // dothing3
    }()
    wg.Wait()
}
  • 同じチャネルに複数のゴルーチンを書き込んでいる場合は、書き込まれているチャネルが1回だけ閉じられていることを確認する必要がある
    • sync.WaitGroupはこれに最適
func processAndGather(in <-chan int, processor func(int) int, num int) []int {
    out := make(chan int, num)
    var wg sync.WaitGroup
    wg.Add(num)

    for i := 0; i < num; i++ {
        go func() {
            defer wg.Done()
            for v := range in {
                out <- processor(v)
            }
        }()
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    var result []int
    for v := range out {
        result = append(result, v)
    }
    return result
}
  • Goの作成者は、標準ライブラリを補足する一連のユーティリティを維持
    • 総称してgolang.org/xパッケージと呼ばれるこれらのパッケージには、WaitGroupの上に構築され、いずれかがエラーを返したときに処理を停止する一連のゴルーチンを作成するErrGroupと呼ばれるタイプが含まれる

Running Code Exactly Once

type SlowComplicatedParser interface {
    Parse(string) string
}

var parser SlowComplicatedParser
var once sync.Once

func Parse(dataToParse string) string {
    once.Do(func() {
        parser = initParser()
    })

    return parser.Parse(dataToParse)
}

func initParser() SlowComplicatedParser {
    return parser
}

Putting Our Concurrent Tools Together

func (p *processor) launch(ctx context.Context, data int) {
    go func() {
        aOut := AOut{}
        var err error
        if err != nil {
            p.errs <- err
            return
        }
        p.outA <- aOut
    }()
    go func() {
        bOut := BOut{}
        var err error
        if err != nil {
            p.errs <- err
            return
        }
        p.outB <- bOut
    }()
    go func() {
        select {
        case <-ctx.Done():
            return
        case <-p.inC:
            cOut := COut{}
            err := errors.New("")
            if err != nil {
                p.errs <- err
                return
            }
            p.outC <- cOut
        }
    }()
}

When to Use Mutexes Instead of Channels

type MutexScoreboardManager struct {
    l          sync.RWMutex
    scoreboard map[string]int
}

func NewMutexScoreboardManager() *MutexScoreboardManager {
    return &MutexScoreboardManager{
        scoreboard: map[string]int{},
    }
}

func (msm *MutexScoreboardManager) Update(name string, val int) {
    msm.l.Lock()
    defer msm.l.Unlock()
    msm.scoreboard[name] = val
}

func (msm *MutexScoreboardManager) Read(name string) (int, bool) {
    msm.l.RLock()
    defer msm.l.RUnlock()
    val, ok := msm.scoreboard[name]
    return val, ok
}
  • ゴルーチンを調整したり、一連のゴルーチンによって変換される値を追跡したりする場合は、チャネルを使用
  • 構造体のフィールドへのアクセスを共有している場合は、ミューテックスを使用する
  • チャネルの使用時に重大なパフォーマンスの問題を発見し問題を修正する他の方法が見つからない場合は、ミューテックスを使用するようにコードを変更する

www.amazon.co.jp