Summary
- A queue & multiple workers in golang
Prerequisites
- go version go1.17.3 darwin/amd64
Channel as a queue, Goroutines as workers
I wanted to write the following pattern in golang:
- A queue has multiple processing targets
- Each worker gets one target from the queue, processes it, and repeats the procedure until the queue is empty
- All workers run in parallel
- If an error occurs in one worker, all other workers will stop after processing the current target
I wrote that pattern using buffered channels and goroutines.
Here are some notes.
- A queue has multiple processing targets
- Each worker gets one target from the queue, processes it, and repeats the procedure until the queue is empty
In this case, we can use buffered channel
as a queue.
A worker tries to get the target from the channel, and processes it if it succeeds, or exits if the channel is closed.
I implemented this pattern with select
statement.
// example
target := []int{1, 8, 5, 2, 3, 9, 1}
queue := make(chan int, len(target))
for len(target) > 0 {
queue <- target[0]
target = target[1:]
}
close(queue)
for {
select {
case v, ok := <-queue:
if !ok {
return
}
// any task you want to do for v
default:
time.Sleep(1 * time.Second)
}
}
- If an error occurs in one worker, all other workers will stop after processing the current target
context.WithCancel is suitable for this purpose.
If the process returns an error, the worker will execute cancelFunc
.cancelFunc
closes ctx
(channel) and the worker will stop when ctx
is closed.
// example
ctx, cancel := context.WithCancel(context.Background())
for {
select {
case <-ctx.Done():
return
default:
}
err := process() // any task you want
if err != nil {
cancel()
}
}
- All workers run in parallel
With sync.WaitGroup, it can wait for multiple goroutines to stop.
When a goroutine starts, call Add to increase the counter, and when it finishes, call Done to decrease the counter.
Wait waits until the counter reaches zero.
// example
var wg sync.WaitGroup
workers := 3
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// any task you want
}()
}
wg.Wait()
In order to vary the processing time depending on the target, I created the following process.
- sleep for the specified number of seconds
- if the specified number is 4, raise an error
func process(n int) error {
time.Sleep(time.Duration(n) * time.Second)
if n == 4 {
return errors.New("Unlucky 4!!!")
}
return nil
}
Then run main.go.
targets: [1 8 5 2 3 7 6]
number of workers: 3
Worker-1: target 1 START
Worker-0: target 5 START
Worker-2: target 8 START
Worker-1: target 1 END
Worker-1: target 2 START
Worker-1: target 2 END
Worker-1: target 3 START
Worker-0: target 5 END
Worker-0: target 7 START
Worker-1: target 3 END
Worker-1: target 6 START
Worker-2: target 8 END
Worker-2: no target...finish
Worker-0: target 7 END
Worker-0: no target...finish
Worker-1: target 6 END
Worker-1: no target...finish
All workers finished.
All targets were processed.
It is hard to understand how it worked in text, so I made a diagram below.
All workers work in parallel and each worker gets the target from the queue.
Worker 1 drew a small number and immediately went for the next target.
In contrast, Worker 2 drew a large number, and the queue had been already empty when the worker finished its first target.
Change the target to cause an error, and run main.go again.
targets: [1 8 4 2 3 7 6]
number of workers: 3
Worker-2: target 1 START
Worker-0: target 8 START
Worker-1: target 4 START
Worker-2: target 1 END
Worker-2: target 2 START
Worker-2: target 2 END
Worker-2: target 3 START
Worker-1: ERROR! Unlucky 4!!!
Worker-2: target 3 END
Worker-2: received ctx.Done()...finish
Worker-0: target 8 END
Worker-0: received ctx.Done()...finish
All workers finished.
7 was not processed...
6 was not processed...
Worker 1 drew “4” and raised an error, then all workers stopped after finishing the target they had.
(The following is the same content in Japanese.)
まとめ
Go
で複数の処理対象を1つのキューに詰めて複数のワーカーで並行して処理するやつを書いたchannel
をキューとして、goroutine
をワーカーとして使えそう- 複数
goroutine
の完了を待つにはWaitGroup
を使うのが良さげ goroutine
から他のgoroutine
を止めたりするのはContext
を使うのが良さげ
環境
- go version go1.17.3 darwin/amd64
キュー(channel)とワーカー(go routine)
Go
でちょっとしたバッチ的なものを書こうとしたときに、
同じような処理を繰り返す部分があったのでgoroutine
を使っていい感じの並行処理を書きたくなった。
イメージとしては
- 処理したい対象が1つのキューに詰められている
- 1つのワーカーはキューから対象を1つ取り出して処理、をキューが空になるまで繰り返す
- すべてのワーカーは並行して動作する
- どこかのワーカーでエラーが発生したら他の全てのワーカーにも知らせて、それを知った各ワーカーは現在処理している対象までで処理を終了する
という感じ。
今回はかんたんなものの例としてGo
のchannel
とgoroutine
を用いて実現してみた。
重要そうなところを(数ヶ月後の自分のために)メモしておく。
- 処理したい対象が1つのキューに詰められている
- 1つのワーカーはキューから対象を1つ取り出して処理、をキューが空になるまで繰り返す
は次のように書いた。
今回の用途のキューであれば、バッファありchannel
を使って実現できる。select
を使って、channel
から値が取り出せれば何らかの処理を実行、channel
が閉じられたらそこで止まるようにした。
target := []int{1, 8, 5, 2, 3, 9, 1}
queue := make(chan int, len(target))
for len(target) > 0 {
queue <- target[0]
target = target[1:]
}
close(queue)
for {
select {
case v, ok := <-queue:
if !ok {
return
}
// any task you want to do for the v
default:
time.Sleep(1 * time.Second)
}
}
- どこかのワーカーでエラーが発生したら他の全てのワーカーにも知らせて、それを知った各ワーカーは現在処理している対象までで処理を終了する
はcontext.WithCancelを使って次のように書いた。
ワーカー(go routine
)でなにかエラーが起こったらcancelFuncを実行してctx(channel
)を閉じ、
ctxが閉じられたらその時点でワーカーを終了させる。
ctx, cancel := context.WithCancel(context.Background())
for {
select {
case <-ctx.Done():
return
default:
}
err := process() // any task you want
if err != nil {
cancel()
}
}
- すべてのワーカーは並行して動作する
ワーカーを複数並行して動かし、全部のワーカーが止まるまで待つような処理はsync.WaitGroupをつかって書いた。
Waitを呼び出すと待つ対象が0になるまで待ち続けるので、
各goroutine
の開始時にAddして待つ対象を増やし、goroutine
が終了したらDoneで待つ対象をへらすようにしている。
var wg sync.WaitGroup
workers := 3
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// any task you want
}()
}
wg.Wait()
あとはこれらをいい感じに組み合わせた。
また、今回は処理対象ごとに処理時間をバラバラにしたかったので、ワーカーの処理として次のようなものを用意した。
- 引数の秒数ぶん待機する
- 引数が4ならエラーを返す
func process(n int) error {
time.Sleep(time.Duration(n) * time.Second)
if n == 4 {
return errors.New("Unlucky 4!!!")
}
return nil
}
main.goを実際に動かしてみる。
targets: [1 8 5 2 3 7 6]
number of workers: 3
Worker-1: target 1 START
Worker-0: target 5 START
Worker-2: target 8 START
Worker-1: target 1 END
Worker-1: target 2 START
Worker-1: target 2 END
Worker-1: target 3 START
Worker-0: target 5 END
Worker-0: target 7 START
Worker-1: target 3 END
Worker-1: target 6 START
Worker-2: target 8 END
Worker-2: no target...finish
Worker-0: target 7 END
Worker-0: no target...finish
Worker-1: target 6 END
Worker-1: no target...finish
All workers finished.
All targets were processed.
狙い通りに動いたのだが、文字だけ追っても見づらいので図にしてみた。
各ワーカーが1つずつ対象をキューから取り出してその秒数ぶん待機、を繰り返しており、
Worker 1のように小さい数字(=処理が軽い)を引くとその分次の対象を取りに行くのが早くなっていて、
逆にWorker 2のように最初に大きい数字(=処理が重い)を引くとその間に他のワーカーが働くので対象がなくなっている。
最後にワーカーがエラーを吐くような処理対象をキューに仕込んで動かしてみる。
targets: [1 8 4 2 3 7 6]
number of workers: 3
Worker-2: target 1 START
Worker-0: target 8 START
Worker-1: target 4 START
Worker-2: target 1 END
Worker-2: target 2 START
Worker-2: target 2 END
Worker-2: target 3 START
Worker-1: ERROR! Unlucky 4!!!
Worker-2: target 3 END
Worker-2: received ctx.Done()...finish
Worker-0: target 8 END
Worker-0: received ctx.Done()...finish
All workers finished.
7 was not processed...
6 was not processed...
こちらも狙い通り動いていて、
この場合は4を引いたWorker 1の処理でエラーが発生し、
他のワーカーもそのタイミングで持っている対象を最後に以降の処理を行わないようになっている。
おわり
久しぶりにGoにガッツリ触った。
go routine
とかchannel
とか適当な理解で済ませていたもののおさらいができてよかった。