Go による Token Bucket 実装

Qiita で

という記事を見かける。

golang.org/x/time/rate は,いわゆる「トークンバケット(token bucket)」アルゴリズムを実装するためのパッケージのようだ。 トークンバケット・アルゴリズムとは

  • A token is added to the bucket every $1/r$ seconds.
  • The bucket can hold at the most $b$ tokens. If a token arrives when the bucket is full, it is discarded.
  • When a packet (network layer PDU) of $n$ bytes arrives,
    • if at least $n$ tokens are in the bucket, $n$ tokens are removed from the bucket, and the packet is sent to the network.
    • if fewer than $n$ tokens are available, no tokens are removed from the bucket, and the packet is considered to be non-conformant.

といったものらしい。 具体的には

func NewLimiter(r Limit, b int) *Limiter

で生成される rate.Limiter 型のインスタンスが上の説明の「バケット」に相当するようだ。 引数の rb も同じ意味かな。

これを並行処理のジェネレータ・パターンと組み合わせると面白そうである。

というわけで,まずは以下のコードを起点としてみよう。

// +build run

package main

import (
    "fmt"
    "sync"
)

func generater(wg *sync.WaitGroup, ch chan<- int) {
    defer func() {
        close(ch)
        wg.Done()
    }()
    for i := 0; i < 10; i++ {
        ch <- i + 1
    }
}

func output(wg *sync.WaitGroup, num int, ch <-chan int) {
    for n := range ch {
        fmt.Printf("Worker %d: %v\n", num, n)
    }
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    ch := make(chan int, 1)
    wg.Add(1)
    go generater(&wg, ch)
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go output(&wg, i+1, ch)
    }
    wg.Wait()
}

generater() 関数はチャネルに対して 1 から 10 までの値を吐き出す。 output() 関数はチャネルから値を取り出して表示するという簡単なお仕事である。 ただし output() 関数は2つの goroutine で起動している。

これを実行すると

$ go run sample1.go 
Worker 2: 1
Worker 2: 2
Worker 2: 3
Worker 1: 5
Worker 1: 6
Worker 1: 7
Worker 1: 8
Worker 1: 9
Worker 1: 10
Worker 2: 4

という出力になった。 2つの goroutine は並行に走ってるので出力順は不定となる。

このコードに golang.org/x/time/rate パッケージを加えて流量の制御を行う。 こんな感じでどうだろう。

// +build run

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/time/rate"
)

func generater(wg *sync.WaitGroup, ch chan<- int) {
    defer func() {
        close(ch)
        wg.Done()
    }()
    l := rate.NewLimiter(rate.Every(time.Second*2), 1)
    for i := 0; i < 10; i++ {
        if err := l.Wait(context.Background()); err != nil {
            fmt.Printf("generater: %v\n", err)
            return
        }
        ch <- i + 1
    }
}

func output(wg *sync.WaitGroup, num int, ch <-chan int) {
    for n := range ch {
        fmt.Printf("Worker %d: %v\n", num, n)
    }
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    ch := make(chan int, 1)
    wg.Add(1)
    go generater(&wg, ch)
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go output(&wg, i+1, ch)
    }
    wg.Wait()
}

これでサイズ 1 のバケットに2秒毎にトークンが挿入される。 トークを取り出すタイミングでチャネルに値を入れるわけだ。 これを実行すると

$ go run sample2.go 
Worker 2: 1
Worker 1: 2
Worker 2: 3
Worker 1: 4
Worker 2: 5
Worker 1: 6
Worker 2: 7
Worker 1: 8
Worker 2: 9
Worker 1: 10

という感じに2秒毎に結果が出力される。

バケットサイズをプロセッサ数と同数にしてみよう。 こんな感じかな。

// +build run

package main

import (
    "context"
    "fmt"
    "runtime"
    "sync"
    "time"

    "golang.org/x/time/rate"
)

func generater(wg *sync.WaitGroup, ch chan<- int, max int) {
    defer func() {
        close(ch)
        wg.Done()
    }()
    l := rate.NewLimiter(rate.Every(time.Second*2), max)
    for i := 0; i < 10; i++ {
        if err := l.Wait(context.Background()); err != nil {
            fmt.Printf("generater: %v\n", err)
            return
        }
        ch <- i + 1
    }
}

func output(wg *sync.WaitGroup, num int, ch <-chan int) {
    for n := range ch {
        fmt.Printf("Worker %d: %v\n", num, n)
    }
    wg.Done()
}

func main() {
    max := runtime.GOMAXPROCS(0)
    var wg sync.WaitGroup
    ch := make(chan int, max)
    wg.Add(1)
    go generater(&wg, ch, max)
    for i := 0; i < max; i++ {
        wg.Add(1)
        go output(&wg, i+1, ch)
    }
    wg.Wait()
}

これを実行すると(プロセッサ数:4)以下のようになった。

$ go run sample3.go 
Worker 3: 3
Worker 3: 4
Worker 2: 2
Worker 4: 1
Worker 1: 5
Worker 3: 6
Worker 2: 7
Worker 4: 8
Worker 1: 9
Worker 3: 10

最初の4つは一気に出力されて,以降は2秒ずつの出力。

といった感じで,処理の制限をワーカ側ではなくジェネレータ側で行うのが特徴と言えるだろうか。 Web クローラとか使い道は色々あるかもしれない。

参考図書

photo
Go言語による並行処理
Katherine Cox-Buday (著), 山口 能迪 (翻訳)
オライリージャパン 2018-10-26
単行本(ソフトカバー)
4873118468 (ASIN), 9784873118468 (EAN), 4873118468 (ISBN)
評価     

Eブック版もある。感想はこちら。 Go 言語で並行処理を書くならこの本は必読書になるだろう。

reviewed by Spiegel on 2020-01-13 (powered by PA-APIv5)

photo
プログラミング言語Go (ADDISON-WESLEY PROFESSIONAL COMPUTING SERIES)
Alan A.A. Donovan (著), Brian W. Kernighan (著), 柴田 芳樹 (翻訳)
丸善出版 2016-06-20
単行本(ソフトカバー)
4621300253 (ASIN), 9784621300251 (EAN), 4621300253 (ISBN), 9784621300251 (ISBN)
評価     

著者のひとりは(あの「バイブル」とも呼ばれる)通称 “K&R” の K のほうである。この本は Go 言語の教科書と言ってもいいだろう。

reviewed by Spiegel on 2016-07-13 (powered by PA-APIv5)