Learning Go Chapter 10 Concurrency in Go
Concurrency in Go
はじめに
- Go の並行処理モデルは他の言語と少し違う
- CSP (Communicating Sequential Process) という理論に基づいた並行処理モデル
並行処理いつ使うのか
並行処理を行うと必ず実行速度が速くなると誤認している人が多いが実際は違う
- 並行処理は並列処理ではない
並行処理を使うかどうかは、どのようにデータが流れるか次第
- 独立して操作可能な複数の操作からデータを取得して結合する場合
時間のかからない処理を並行処理で実行してもあまり価値はない
- I/O の処理が並行処理で実行されることが多い
- いつ並行処理を使えばよいかわからない場合にはまずシーケンシャルな処理を書いて、そのあとベンチマークを取れば良い
Goroutine
- Go の並行処理におけるコアコンセプト
- Goroutine は Go ランタイムによって管理される軽量なプロセス
Goプログラムが起動すると、Goランタイムは多数のスレッドを作成し、単一のゴルーチンを起動してプログラムを実行
- OS がCPUコア全体でスレッドをスケジュールするのと同じように、プログラムによって作成されたすべてのゴルーチンは、最初のゴルーチンを含め、Goランタイムスケジューラによって自動的にこれらのスレッドに割り当てられる
スレッドと比較した Goroutine のメリット
- 作成が速い (システムリソースを消費しないから)
- スタックサイズが小さい (し、必要に応じて拡張できる)
- スイッチコストが小さい
- Go プロセスに最適化されている
Goroutineは、関数呼び出しの前にgoキーワードを配置することによって起動
- 関数によって返される値はすべて無視される
Goroutineの例
func process(val int) int { return 1 } func runThingConcurrently(in <-chan int, out chan<- int) { go func() { for val := range in { result := process(val) out <- result } }() }
Channel
- Goroutine はチャネルを使って通信する
- Slice や Map と同じように
make
を使って作成できるビルトインタイプ- Slice や Map と同じように チャネルは参照型。関数に渡すとポインターの値がコピーされる
- zero value は nil
ch := make(chan int)
Reading, Writing, and Buffering
<-
オペレータをつかって チャネルと通信する
a := <- ch // ch チャネルから値を読み取る ch <- b // ch チャネルに値bを書き込む
チャネルに書き込まれた値は一度しか読み出されない
- 複数の goroutine がチャネルを読み出す場合、どちらか一方しかチャネルの値を読み出すことができない
チャネルの型宣言の仕方で読み取るのか、書き出すのかを示す
in <-chan int
このように<-
の後にchan
がある場合は読み取りout chan<-int
このようにchan
の後に<-
がある場合は書き出し
デフォルトでは、チャネルはバッファリングされていない
- バッファリングされていない開いているチャネルに書き込むたびに、別のゴルーチンが同じチャネルから読み取るまで、書き込みゴルーチンが一時停止される
- 同様に、開いているバッファなしチャネルからの読み取りにより、別のゴルーチンが同じチャネルに書き込むまで、読み取りゴルーチンが一時停止される
=> 少なくとも2つの同時に実行されているゴルーチンがないと、バッファリングされていないチャネルに書き込みまたは読み取りを行うことができないことを意味
バッファリングされたチャネルは、チャネルの作成時にバッファの容量を指定することによって作成される
- バッファの capacity を変更することはできない
ch2 := make(chan int, 10)
- たいていの場合はバッファリングされていないチャネルで十分
for-range とチャネル
- ループは、チャネルが閉じられるまで、またはbreakまたはreturnステートメントに到達するまで続く
for v := range ch { fmt.Println(v) }
チャネルを閉じる
- チャネルを閉じる場合、ビルトインファンクションの
close
を使う
close(ch)
チャネルが閉じられると、チャネルへの書き込みまたはチャネルを再び閉じようとすると、パニックになる
- ただし、閉じたチャネルからの読み取りの試行は常に成功する...
チャネルが閉じられているかどうかを検出するために ok カンマイディオムを使用する
v, ok := <-ch
- チャネルを閉じる責任は、チャネルに書き込むゴルーチンにある
チャネルを閉じる必要があるのは、チャネルが閉じるのを待機しているゴルーチンがある場合のみ
- チャネルはただの変数なので、使用されなくなったら GC される
チャネルはコードを一連の段階として考え、データの依存関係を明確にすることをガイドする
- 他の言語の場合、変更可能なグローバルな共有状態を使うため、データがプログラムをどのように流れるかを理解することが困難、スレッドの独立性を判断するのも困難になる
チャネルの振る舞い
バッファされていないチャネル OPEN | バッファされていないチャネル CLOSE | バッファされているチャネル OPEN | バッファされているチャネル CLOSE | nil | |
---|---|---|---|---|---|
Read | 書き込みがあるまで待機 | zero バリューを返す | バッファが空なら待機 | バッファに値が残っているならその値を返す。空なら zero バリューを返す | 永久にハングする |
Write | 読み込みがあるまで待機 | Panic! | バッファがいっぱいなら待機 | Panic! | 永久にハングする |
Close | 正常に機能する | Panic! | 正常に機能する、値はバッファリングされたまま | Panic! | Panic! |
select
- select 文は Go の並行性モデルにおけるチャネル以外の主たる構成要素
- selectキーワードを使用すると、ゴルーチンが複数のチャネルのセットの1つから読み取りまたは書き込みを行うことができる
select { case v := <-ch: fmt.Println(v) case v := <-ch2: fmt.Println(v) case ch3 <- x: fmt.Println("wrote", x) case <-ch4: fmt.Println("got value on ch4, but ignored it") }
複数のケースに読み取りまたは書き込みが可能なチャネルがある場合はどうななるか?
selectは多数のチャネルを介した通信を担当するため、多くの場合、forループ内に埋め込まれている
for { select { case <-done: return case v := <-ch: fmt.Println(v) } }
- チャネルにノンブロッキングの読み取りまたは書き込みを実装する場合は、selectで default を使用する
Concurrency の実践とパターン
- 並行性は API に公開しない
並行性制御の仕組みは実装の詳細であるため、API として並行性制御の仕組みを公開すると、利用者が注意深く並行制御する責務を負ってしまう
ただし、並行性制御のためのヘルパー関数を提供するライブラリの場合は例外。一部の関数はチャネルを引数にとる関数を export している。
time.After
等
Goroutines, for Loops, and Varying Variables
- ゴルーチンを起動するために使用するクロージャにはパラメータがない
- 代わりに、宣言された環境から値をキャプチャする
- これが機能しない一般的な状況がforループのインデックスまたは値をキャプチャしようとしたとき
Always Clean Up Your Goroutines
- ゴルーチン関数を起動するときはいつでも、それが最終的に終了することを確認する必要がある
- 変数とは異なり、Goランタイムは、ゴルーチンが二度と使用されないことを検出できない
ゴルーチンが終了しない場合でも、スケジューラは定期的に何もしない時間を与え、プログラムの速度を低下させる
- これは Goroutine リークと呼ぶ
ジェネレータの実装のために Goroutine 使うと、例えば使用者側が途中で break しただけで、Goroutine リークが起きる
The Done Channel Pattern
- いくつかの関数を指定して最も速く結果を返した関数の値を返す関数
func searchData(s string, searchers []func(string) []string) []string { done := make(chan struct{}) result := make(chan []string) for _, searcher := range searchers { go func(searcher func(string) []string) { select { case result <- searcher(s): case <-done: } }(searcher) } r := <-result close(done) return r }
Using a Cancel Function to Terminate a Goroutine
- Goroutine リークを防ぐために、Goroutine を起動する側で Groutine をクローズするための関数(クロージャー)を返す
func countTo(max int) (<-chan int, func()) { ch := make(chan int) done := make(chan struct{}) cancel := func() { close(done) } go func() { for i := 0; i < max; i++ { select { case <-done: return case ch <- i: } } close(ch) }() return ch, cancel } func testCountTo() { ch, cancel := countTo(10) for i := range ch { if i > 5 { break } fmt.Println(i) } cancel() }
When to Use Buffered and Unbuffered Channels
- バッファリングされたチャネルは、起動したゴルーチンの数がわかっている場合、起動するゴルーチンの数を制限したい場合、またはキューに入れられる作業の量を制限したい場合に役立つ
Backpressure
- Backpressure の実装でバッファーチャネルが使用できる可能性がある
type PressureGauge struct { ch chan struct{} } func New(limit int) *PressureGauge { ch := make(chan struct{}, limit) for i := 0; i < limit; i++ { ch <- struct{}{} } return &PressureGauge{ ch: ch, } } // チャネルが使用できなかった場合(defaultの場合) エラーがかえる func (pg *PressureGauge) Process(f func()) error { select { case <-pg.ch: f() pg.ch <- struct{}{} return nil default: return errors.New("no more capacity") } }
Turning Off a case in a select
- 複数の同時ソースからのデータを組み合わせる必要がある場合は、selectキーワードが最適
- ただし、閉じたチャネルを適切に処理する必要がある
選択のケースの1つが閉じたチャネルの読み取りである場合、それは常に成功し、ゼロ値を返す
- そのケースを選択するたびに、値が有効であることを確認し、ケースをスキップする必要がある
- 読み取りの間隔が空いていると、プログラムはジャンク値の読み取りに多くの時間を浪費する
nilチャネルを使用して、選択でケースを無効にすることができる
for { select { case v, ok := <-in: if !ok { in = nil continue } fmt.Println(v) case v, ok := <-in2: if !ok { in2 = nil continue } fmt.Println(v) case <-done: return } }
How to Time Out Code
func timeLimit() (int, error) { var result int var err error done := make(chan struct{}) go func() { result, err = 1, nil close(done) }() select { case <-done: return result, err case <-time.After(2 * time.Second): return 0, errors.New("work timed out") } }
Using WaitGroups
- 1つのゴルーチンが、複数のゴルーチンが作業を完了するのを待つ必要がある場合がある
- 単一のゴルーチンを待っている場合は、前に見た完了チャネルパターンを使用できる
- ただし、複数のゴルーチンを待機している場合は、標準ライブラリの同期パッケージにあるWaitGroupを使用する必要がある
func testWaitGroup() { var wg sync.WaitGroup wg.Add(3) go func() { // wg を直接渡さない => コピーされて、デクリメントされないため // キャプチャして同一インスタンスであることを保証する defer wg.Done() // dothing1 }() go func() { defer wg.Done() // dothing2 }() go func() { defer wg.Done() // dothing3 }() wg.Wait() }
- 同じチャネルに複数のゴルーチンを書き込んでいる場合は、書き込まれているチャネルが1回だけ閉じられていることを確認する必要がある
- sync.WaitGroupはこれに最適
func processAndGather(in <-chan int, processor func(int) int, num int) []int { out := make(chan int, num) var wg sync.WaitGroup wg.Add(num) for i := 0; i < num; i++ { go func() { defer wg.Done() for v := range in { out <- processor(v) } }() } go func() { wg.Wait() close(out) }() var result []int for v := range out { result = append(result, v) } return result }
- Goの作成者は、標準ライブラリを補足する一連のユーティリティを維持
- 総称してgolang.org/xパッケージと呼ばれるこれらのパッケージには、WaitGroupの上に構築され、いずれかがエラーを返したときに処理を停止する一連のゴルーチンを作成するErrGroupと呼ばれるタイプが含まれる
Running Code Exactly Once
type SlowComplicatedParser interface { Parse(string) string } var parser SlowComplicatedParser var once sync.Once func Parse(dataToParse string) string { once.Do(func() { parser = initParser() }) return parser.Parse(dataToParse) } func initParser() SlowComplicatedParser { return parser }
Putting Our Concurrent Tools Together
func (p *processor) launch(ctx context.Context, data int) { go func() { aOut := AOut{} var err error if err != nil { p.errs <- err return } p.outA <- aOut }() go func() { bOut := BOut{} var err error if err != nil { p.errs <- err return } p.outB <- bOut }() go func() { select { case <-ctx.Done(): return case <-p.inC: cOut := COut{} err := errors.New("") if err != nil { p.errs <- err return } p.outC <- cOut } }() }
When to Use Mutexes Instead of Channels
- 他のプログラミング言語のスレッド間でデータへのアクセスを調整する必要がある場合は、おそらくミューテックスを使用したことがあるはず
- これは相互排除の略であり、ミューテックスの役割は、一部のコードの同時実行または共有データへのアクセスを制限すること
- この保護された部分はクリティカルセクションと呼ばれる
type MutexScoreboardManager struct { l sync.RWMutex scoreboard map[string]int } func NewMutexScoreboardManager() *MutexScoreboardManager { return &MutexScoreboardManager{ scoreboard: map[string]int{}, } } func (msm *MutexScoreboardManager) Update(name string, val int) { msm.l.Lock() defer msm.l.Unlock() msm.scoreboard[name] = val } func (msm *MutexScoreboardManager) Read(name string) (int, bool) { msm.l.RLock() defer msm.l.RUnlock() val, ok := msm.scoreboard[name] return val, ok }