Featured image of post A queue and multiple workers in golang

A queue and multiple workers in golang

Goでたくさんの対象をいい感じに分散させて並行処理するやつ

Summary

  • A queue & multiple workers in golang
    • We can use buffered channel as a queue, and goroutines as workers.
    • WaitGroup is useful to wait for goroutines to finish.
    • Context makes it easy to stop other goroutines from one goroutine.

Prerequisites

  • go version go1.17.3 darwin/amd64

Channel as a queue, Goroutines as workers

I wanted to write the following pattern in golang:

  • A queue has multiple processing targets
  • Each worker gets one target from the queue, processes it, and repeats the procedure until the queue is empty
  • All workers run in parallel
  • If an error occurs in one worker, all other workers will stop after processing the current target

A queue and workers

I wrote that pattern using buffered channels and goroutines.

Here are some notes.

  • A queue has multiple processing targets
  • Each worker gets one target from the queue, processes it, and repeats the procedure until the queue is empty

In this case, we can use buffered channel as a queue.
A worker tries to get the target from the channel, and processes it if it succeeds, or exits if the channel is closed.
I implemented this pattern with select statement.

// example
target := []int{1, 8, 5, 2, 3, 9, 1}

queue := make(chan int, len(target))
for len(target) > 0 {
  queue <- target[0]
  target = target[1:]
}
close(queue)

for {
  select {
  case v, ok := <-queue:
    if !ok {
      return
    }
    // any task you want to do for v

  default:
    time.Sleep(1 * time.Second)
  }
}
  • If an error occurs in one worker, all other workers will stop after processing the current target

context.WithCancel is suitable for this purpose.
If the process returns an error, the worker will execute cancelFunc.
cancelFunc closes ctx(channel) and the worker will stop when ctx is closed.

// example
ctx, cancel := context.WithCancel(context.Background())
for {
  select {
  case <-ctx.Done():
    return
  default:
  }

  err := process() // any task you want

  if err != nil {
    cancel()
  }
}
  • All workers run in parallel

With sync.WaitGroup, it can wait for multiple goroutines to stop.
When a goroutine starts, call Add to increase the counter, and when it finishes, call Done to decrease the counter.
Wait waits until the counter reaches zero.

// example
var wg sync.WaitGroup
workers := 3

for i := 0; i < workers; i++ {
  wg.Add(1)
  go func() {
    defer wg.Done()
    // any task you want
  }()
}

wg.Wait()

In order to vary the processing time depending on the target, I created the following process.

  1. sleep for the specified number of seconds
  2. if the specified number is 4, raise an error
func process(n int) error {
  time.Sleep(time.Duration(n) * time.Second)
  if n == 4 {
    return errors.New("Unlucky 4!!!")
  }
  return nil
}

Then run main.go.

Go Playground

targets: [1 8 5 2 3 7 6]
number of workers: 3
Worker-1: target 1 START
Worker-0: target 5 START
Worker-2: target 8 START
Worker-1: target 1 END
Worker-1: target 2 START
Worker-1: target 2 END
Worker-1: target 3 START
Worker-0: target 5 END
Worker-0: target 7 START
Worker-1: target 3 END
Worker-1: target 6 START
Worker-2: target 8 END
Worker-2: no target...finish
Worker-0: target 7 END
Worker-0: no target...finish
Worker-1: target 6 END
Worker-1: no target...finish
All workers finished.
All targets were processed.

It is hard to understand how it worked in text, so I made a diagram below.
All workers work in parallel and each worker gets the target from the queue.
Worker 1 drew a small number and immediately went for the next target.
In contrast, Worker 2 drew a large number, and the queue had been already empty when the worker finished its first target.

how it worked

Change the target to cause an error, and run main.go again.

Go Playground

targets: [1 8 4 2 3 7 6]
number of workers: 3
Worker-2: target 1 START
Worker-0: target 8 START
Worker-1: target 4 START
Worker-2: target 1 END
Worker-2: target 2 START
Worker-2: target 2 END
Worker-2: target 3 START
Worker-1: ERROR! Unlucky 4!!!
Worker-2: target 3 END
Worker-2: received ctx.Done()...finish
Worker-0: target 8 END
Worker-0: received ctx.Done()...finish
All workers finished.
7 was not processed...
6 was not processed...

Worker 1 drew “4” and raised an error, then all workers stopped after finishing the target they had.

an error from one worker stopped all workers

(The following is the same content in Japanese.)

まとめ

  • Goで複数の処理対象を1つのキューに詰めて複数のワーカーで並行して処理するやつを書いた
    • channelをキューとして、goroutineをワーカーとして使えそう
    • 複数goroutineの完了を待つにはWaitGroupを使うのが良さげ
    • goroutineから他のgoroutineを止めたりするのはContextを使うのが良さげ

環境

  • go version go1.17.3 darwin/amd64

キュー(channel)とワーカー(go routine)

Goでちょっとしたバッチ的なものを書こうとしたときに、
同じような処理を繰り返す部分があったのでgoroutineを使っていい感じの並行処理を書きたくなった。

イメージとしては

  • 処理したい対象が1つのキューに詰められている
  • 1つのワーカーはキューから対象を1つ取り出して処理、をキューが空になるまで繰り返す
  • すべてのワーカーは並行して動作する
  • どこかのワーカーでエラーが発生したら他の全てのワーカーにも知らせて、それを知った各ワーカーは現在処理している対象までで処理を終了する

という感じ。

キューとワーカー

今回はかんたんなものの例としてGochannelgoroutineを用いて実現してみた。

重要そうなところを(数ヶ月後の自分のために)メモしておく。

  • 処理したい対象が1つのキューに詰められている
  • 1つのワーカーはキューから対象を1つ取り出して処理、をキューが空になるまで繰り返す

は次のように書いた。

今回の用途のキューであれば、バッファありchannelを使って実現できる。
selectを使って、channelから値が取り出せれば何らかの処理を実行、channelが閉じられたらそこで止まるようにした。

target := []int{1, 8, 5, 2, 3, 9, 1}

queue := make(chan int, len(target))
for len(target) > 0 {
  queue <- target[0]
  target = target[1:]
}
close(queue)

for {
  select {
  case v, ok := <-queue:
    if !ok {
      return
    }
    // any task you want to do for the v

  default:
    time.Sleep(1 * time.Second)
  }
}
  • どこかのワーカーでエラーが発生したら他の全てのワーカーにも知らせて、それを知った各ワーカーは現在処理している対象までで処理を終了する

context.WithCancelを使って次のように書いた。
ワーカー(go routine)でなにかエラーが起こったらcancelFuncを実行してctx(channel )を閉じ、
ctxが閉じられたらその時点でワーカーを終了させる。

ctx, cancel := context.WithCancel(context.Background())
for {
  select {
  case <-ctx.Done():
    return
  default:
  }

  err := process() // any task you want

  if err != nil {
    cancel()
  }
}
  • すべてのワーカーは並行して動作する

ワーカーを複数並行して動かし、全部のワーカーが止まるまで待つような処理はsync.WaitGroupをつかって書いた。
Waitを呼び出すと待つ対象が0になるまで待ち続けるので、
goroutineの開始時にAddして待つ対象を増やし、goroutineが終了したらDoneで待つ対象をへらすようにしている。

var wg sync.WaitGroup
workers := 3

for i := 0; i < workers; i++ {
  wg.Add(1)
  go func() {
    defer wg.Done()
    // any task you want
  }()
}

wg.Wait()

あとはこれらをいい感じに組み合わせた。

また、今回は処理対象ごとに処理時間をバラバラにしたかったので、ワーカーの処理として次のようなものを用意した。

  1. 引数の秒数ぶん待機する
  2. 引数が4ならエラーを返す
func process(n int) error {
  time.Sleep(time.Duration(n) * time.Second)
  if n == 4 {
    return errors.New("Unlucky 4!!!")
  }
  return nil
}

main.goを実際に動かしてみる。

Go Playground

targets: [1 8 5 2 3 7 6]
number of workers: 3
Worker-1: target 1 START
Worker-0: target 5 START
Worker-2: target 8 START
Worker-1: target 1 END
Worker-1: target 2 START
Worker-1: target 2 END
Worker-1: target 3 START
Worker-0: target 5 END
Worker-0: target 7 START
Worker-1: target 3 END
Worker-1: target 6 START
Worker-2: target 8 END
Worker-2: no target...finish
Worker-0: target 7 END
Worker-0: no target...finish
Worker-1: target 6 END
Worker-1: no target...finish
All workers finished.
All targets were processed.

狙い通りに動いたのだが、文字だけ追っても見づらいので図にしてみた。
各ワーカーが1つずつ対象をキューから取り出してその秒数ぶん待機、を繰り返しており、
Worker 1のように小さい数字(=処理が軽い)を引くとその分次の対象を取りに行くのが早くなっていて、
逆にWorker 2のように最初に大きい数字(=処理が重い)を引くとその間に他のワーカーが働くので対象がなくなっている。

処理はこんな感じ

最後にワーカーがエラーを吐くような処理対象をキューに仕込んで動かしてみる。

Go Playground

targets: [1 8 4 2 3 7 6]
number of workers: 3
Worker-2: target 1 START
Worker-0: target 8 START
Worker-1: target 4 START
Worker-2: target 1 END
Worker-2: target 2 START
Worker-2: target 2 END
Worker-2: target 3 START
Worker-1: ERROR! Unlucky 4!!!
Worker-2: target 3 END
Worker-2: received ctx.Done()...finish
Worker-0: target 8 END
Worker-0: received ctx.Done()...finish
All workers finished.
7 was not processed...
6 was not processed...

こちらも狙い通り動いていて、
この場合は4を引いたWorker 1の処理でエラーが発生し、
他のワーカーもそのタイミングで持っている対象を最後に以降の処理を行わないようになっている。

ワーカーでエラーが起きると他のワーカーも止まる

おわり

久しぶりにGoにガッツリ触った。

go routineとかchannelとか適当な理解で済ませていたもののおさらいができてよかった。

おまけ

キーボードを枕にするそとちゃん