Token Bucket

学校里的宽带还是有分三六九等的,如果你钱交的多的,对应带宽自然比较高。但你想过他们是如何限制你的最高带宽的吗?其实用令牌桶( Token Bucket )这个算法就可以实现速率限制这个功能。

那么什么又是令牌桶的,简单的说就是假设你有一个专门用于放令牌的桶,桶的最大容量是有上限的,设为C,而该桶自带一个机制就是每隔固定的一段时间会产生一个令牌,一旦令牌充满了桶,桶中的令牌个数就不会再增加了。接下来是重点,如果你要进行某个需要控制速率的操作,在执行这个操作之前,必须先从桶中拿到这个操作对应的令牌个数,比如规定了操作A需要5个令牌,操作B需要10个令牌,那么如果要执行操作A,则需要从桶中拿出5个令牌,如果桶中的令牌数目不够,就需要等到令牌数量足够时才能进行A操作。由于桶中令牌产生的速率是固定的,而在进行相关操作之前,又必须获得足够数量的令牌,这样就到达了速率限制( Rate Limiting )的目的。

除了速率限制之外,令牌桶算法还具有流量整形( Traffic Shaping )的功能,因为令牌桶中的令牌是以恒定的速率产生的,所以一段时间内产生的令牌数量是固定的,如果在某一段时间消耗的令牌数量较多,也就是对应操作的数量很多,那么之后的一段时间的操作数量必定会收到限制,从整体上来看,操作的速率会比较均匀,也就是说流量是以比较均衡的速率向外发送的。

Token Bucket

下面我们来看一下令牌桶算法的实现,由于令牌桶中非常重要的两个属性就是令牌桶的容量和令牌产生的速率,分别用 capacityrate 来表示,我们使用一个通道来存储令牌,因为我们并不关心令牌的内容,所以我们使用 tokens chan struct{} 来表示这个桶。

在初始化令牌桶的时候,直接起一个 goroutine 来定时向我们的桶中放入令牌,通过 time.NewTicker 用于定时生产令牌,如果令牌桶满了,则b.tokens <- struct{}{} 操作也会被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Bucket struct {
capacity int64
tokens chan struct{}
rate time.Duration
sync.Mutex
}

func NewBucket(rate time.Duration, capacity int64) *Bucket {
tokens := make(chan struct{}, capacity)
b := &Bucket{
capacity: capacity,
tokens: tokens,
rate: rate,
}
go func(b *Bucket) {
ticker := time.NewTicker(rate)
for _ := range ticker.C {
b.tokens <- struct{}{}
}
}(b)

return b
}

下面给出了设置时间间隔(等效为速率)的方法,时间间隔越短,对应的平均速率就越快,二者成反比关系。

1
2
3
4
5
6
7
8
9
10
11
12
func (b *Bucket) GetRate() time.Duration {
b.Lock()
tmp := b.rate
b.Unlock()
return tmp
}

func (b *Bucket) SetRate(rate time.Duration) {
b.Lock()
b.rate = rate
b.Unlock()
}

由于在进行特定操作之前需要取出固定数量的令牌,可以形象的把令牌想象为存在银行里的钱,把令牌桶想象成银行,如果你想要买一台电脑,你就需要从银行拿出足够的钱,如果银行中没有足够的钱,你只能等到你积累了足够的钱才可以买你想要的电脑。也就是说在执行相应操作之前需要有足够多的令牌才能顺利执行。注意到这个函数是阻塞的,只有在 n<-b.tokens 成功之后,该函数才会返回。

1
2
3
4
5
6
func (b *Bucket) withdrawTokens(n int64) error {
for i := int64(0); i < n; i++ {
<-b.tokens
}
return nil
}

SpendToken 函数会返回一个通道,其函数内部会起一个 goroutine ,在该 goroutine 里会执行上述 withdrawTokens 方法,由于该函数是阻塞的,所以只有它执行完,才能将返回值放入通道 c 中,然后利用 close(c) 操作将该通道关闭,这样以来别处调用该函数的地方就可以顺利返回了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (b *Bucket) SpendToken(n uint64) <-chan error {
if n < 0 {
n = 1
}

c := make(chan error)

go func(b *Bucket, n uint64, c chan error) {
c <- b.withdrawTokens(n)
close(c)
return
}(b, n, c)

return c
}

Drain 方法将桶中的所有令牌一次性放光,具体该在哪里使用它要看具体需求。

1
2
3
4
5
6
7
8
9
10
func (b *Bucket) Drain() error {
for {
select {
case <-b.tokens:
continue
default:
return nil
}
}
}

令牌桶的使用

使用令牌桶之前当然需要新建一个令牌桶,并指定令牌桶的容量及其平均速率。因为使用令牌桶的目的之一就是限制速率,在需要被限速的函数( RegulatedAction() )前使用 <-bucket.SpendToken(x) ,该 RegulatedAction 只有等到攒满 x 个令牌才会执行,这样就可以达到限制速率的目的了。

1
2
3
4
bucket := NewBucket(1 * time.Second, 10)

<-bucket.SpendToken(10)
RegulatedAction()
Pieces of Valuable Programming Knowledges