Go による Token Bucket 実装
Qiita で
という記事を見かける。
golang.org/x/time/rate
は,いわゆる「トークンバケット(token bucket)」アルゴリズムを実装するためのパッケージのようだ。
トークンバケット・アルゴリズムとは
といったものらしい。 具体的には
で生成される rate
.Limiter
型のインスタンスが上の説明の「バケット」に相当するようだ。
引数の r
と b
も同じ意味かな。
これを並行処理のジェネレータ・パターンと組み合わせると面白そうである。
というわけで,まずは以下のコードを起点としてみよう。
// +build run
package main
import (
"fmt"
"sync"
)
func generater(wg *sync.WaitGroup, ch chan<- int) {
defer func() {
close(ch)
wg.Done()
}()
for i := 0; i < 10; i++ {
ch <- i + 1
}
}
func output(wg *sync.WaitGroup, num int, ch <-chan int) {
for n := range ch {
fmt.Printf("Worker %d: %v\n", num, n)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
ch := make(chan int, 1)
wg.Add(1)
go generater(&wg, ch)
for i := 0; i < 2; i++ {
wg.Add(1)
go output(&wg, i+1, ch)
}
wg.Wait()
}
generater()
関数はチャネルに対して 1 から 10 までの値を吐き出す。
output()
関数はチャネルから値を取り出して表示するという簡単なお仕事である。
ただし output()
関数は2つの goroutine で起動している。
これを実行すると
$ go run sample1.go
Worker 2: 1
Worker 2: 2
Worker 2: 3
Worker 1: 5
Worker 1: 6
Worker 1: 7
Worker 1: 8
Worker 1: 9
Worker 1: 10
Worker 2: 4
という出力になった。 2つの goroutine は並行に走ってるので出力順は不定となる。
このコードに golang.org/x/time/rate
パッケージを加えて流量の制御を行う。
こんな感じでどうだろう。
// +build run
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func generater(wg *sync.WaitGroup, ch chan<- int) {
defer func() {
close(ch)
wg.Done()
}()
l := rate.NewLimiter(rate.Every(time.Second*2), 1)
for i := 0; i < 10; i++ {
if err := l.Wait(context.Background()); err != nil {
fmt.Printf("generater: %v\n", err)
return
}
ch <- i + 1
}
}
func output(wg *sync.WaitGroup, num int, ch <-chan int) {
for n := range ch {
fmt.Printf("Worker %d: %v\n", num, n)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
ch := make(chan int, 1)
wg.Add(1)
go generater(&wg, ch)
for i := 0; i < 2; i++ {
wg.Add(1)
go output(&wg, i+1, ch)
}
wg.Wait()
}
これでサイズ 1 のバケットに2秒毎にトークンが挿入される。 トークを取り出すタイミングでチャネルに値を入れるわけだ。 これを実行すると
$ go run sample2.go
Worker 2: 1
Worker 1: 2
Worker 2: 3
Worker 1: 4
Worker 2: 5
Worker 1: 6
Worker 2: 7
Worker 1: 8
Worker 2: 9
Worker 1: 10
という感じに2秒毎に結果が出力される。
バケットサイズをプロセッサ数と同数にしてみよう。 こんな感じかな。
// +build run
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
"golang.org/x/time/rate"
)
func generater(wg *sync.WaitGroup, ch chan<- int, max int) {
defer func() {
close(ch)
wg.Done()
}()
l := rate.NewLimiter(rate.Every(time.Second*2), max)
for i := 0; i < 10; i++ {
if err := l.Wait(context.Background()); err != nil {
fmt.Printf("generater: %v\n", err)
return
}
ch <- i + 1
}
}
func output(wg *sync.WaitGroup, num int, ch <-chan int) {
for n := range ch {
fmt.Printf("Worker %d: %v\n", num, n)
}
wg.Done()
}
func main() {
max := runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
ch := make(chan int, max)
wg.Add(1)
go generater(&wg, ch, max)
for i := 0; i < max; i++ {
wg.Add(1)
go output(&wg, i+1, ch)
}
wg.Wait()
}
これを実行すると(プロセッサ数:4)以下のようになった。
$ go run sample3.go
Worker 3: 3
Worker 3: 4
Worker 2: 2
Worker 4: 1
Worker 1: 5
Worker 3: 6
Worker 2: 7
Worker 4: 8
Worker 1: 9
Worker 3: 10
最初の4つは一気に出力されて,以降は2秒ずつの出力。
といった感じで,処理の制限をワーカ側ではなくジェネレータ側で行うのが特徴と言えるだろうか。 Web クローラとか使い道は色々あるかもしれない。
参考図書
- Go言語による並行処理
- Katherine Cox-Buday (著), 山口 能迪 (翻訳)
- オライリージャパン 2018-10-26
- 単行本(ソフトカバー)
- 4873118468 (ASIN), 9784873118468 (EAN), 4873118468 (ISBN)
- 評価
- プログラミング言語Go (ADDISON-WESLEY PROFESSIONAL COMPUTING SERIES)
- Alan A.A. Donovan (著), Brian W. Kernighan (著), 柴田 芳樹 (翻訳)
- 丸善出版 2016-06-20
- 単行本(ソフトカバー)
- 4621300253 (ASIN), 9784621300251 (EAN), 4621300253 (ISBN), 9784621300251 (ISBN)
- 評価
著者のひとりは(あの「バイブル」とも呼ばれる)通称 “K&R” の K のほうである。この本は Go 言語の教科書と言ってもいいだろう。