Timing Wheel

timingwheel

当我们自己编写服务端程序时候,可能需要新建大量的定时器,对每个 TCP 连接设置连接超时,面对成千上万的客户端连接,如果傻傻地为每一个连接分配一个计时器的话,那么 10k 个连接就对应 10k 的计时器,10w 个连接就对应 10w 个计时器,这种方法显然是比较消耗内存的。

对于需要大量定时器的情况,由于许多定时器是在相近的时间超时的,或者说在一个时间范围内到时的,如果对时间精度没有那么近乎苛刻的要求的话,我们可以利用这个特性将多个同一时间段的计时器整合成一个来使用,这样的话就可以极大程度上降低内存的消耗,例如现在有 3,4,5,6 四个连接,它们均会在后面的3到4秒内超时,我们可以采用一种方法使它们共用一个计数器,并当时间点到来时,它们均可以被通知到。

closed channel

也就是说,我们需要一个机制来重复利用某一个计时器,换句话说,当时间点到来时,我们需要一种广播的机制让等待该计时器的都能被通知到。

为此需要利用 golang 中通道的几个特性,我们通过一个例子来理解一下,在下面的函数中 make 了一个 unbuffered channel ch,还新起了3个 goroutine ,每个goroutine 会执行 fmt.Println("start ", i) ,但等它们都执行到 <-ch 语句时,由于没有其他 goroutine 执行 ch <- struct{}{} 操作,所以3个 goroutine 都会阻塞,当 time.Sleep(1 * time.Second) 结束时,我们利用 close(ch) ,此操作相当于向每个 goroutine 进行 ch <- struct{}{} ,此时阻塞解除,每个 goroutine 都可以继续执行 fmt.Println("end ", i) 。也就是说 close(ch) 相当于一个广播操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
ch := make(chan struct{})
for i := 0; i < 3; i++ {
i := i
go func() {
fmt.Println("start ", i)
<-ch
fmt.Println("end ", i)
}()
}
time.Sleep(1 * time.Second)
close(ch)
select {}
}

上面的例子利用到了通道的的第一个特性和第三个特性。

1
2
3
1. The expression blocks until a value is available. 
2. Receiving from a nil channel blocks forever.
3. A receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value after any previously sent values have been received.

timing wheel

那么上面的例子和我们要实现的东西有什么联系呢,其实联系非常大,我们可以给每一个需要进行定时操作的 goroutine 一个 unbuffered channel
,并且给予在相同时间点超时的 goroutine 以同样的通道。然后我们自己维护一个 ticker ,这个 ticker 每隔固定的时间,将相应的通道关闭,这相当于一个广播操作,这样每个等待此广播的 goroutine 都会收到消息,也就可以继续执行下去了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type TimingWheel struct {
sync.Mutex

interval time.Duration
maxTimeout time.Duration

ticker *time.Ticker
cs []chan struct{}
pos int

stop chan struct{}
}

func New(interval time.Duration, buckets int) *TimingWheel {
w := &TimingWheel{
interval: interval,
maxTimeout: interval * time.Duration(buckets),
ticker: time.NewTicker(interval),
cs : make([]chan struct{}, buckets),
pos: 0,
stop: make(chan struct{}),
}
for i := range w.cs {
w.cs[i] = make(chan struct{})
}
go w.run()
return w
}

光说可能有一点抽象,下面我们来看一下具体的实现,TimingWheel 结构体相当于模拟一个秒表,其中的 ticker 就相当于一个秒针,同时我们还维护了一个通道数组 cs (其存储结构为顺序存储,而其逻辑结构则是一个圆环),作为定时器使用,ticker 每隔固定的 interval ,会让指针向前移动一个单位( pos++ ),并将该位置存储的通道关闭,也就相同于定时器返回,这种方法实现的定时器相对来说增加了定时器的利用率,降低了内存的消耗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (w *TimingWheel) run() {
for {
select {
case <-w.ticker.C:
w.tick()
case <-w.stop:
w.ticker.Stop()
return
}
}
}

func (w *TimingWheel) tick() {
w.Lock()
last := w.cs[w.pos]
w.cs[w.pos] = make(chan struct{})

w.pos = (w.pos + 1) % len(w.cs)
w.Unlock()
close(last)
}

After 方法为我们提供一个定时器,如果 timeout 数值太大无法落在 timing wheel 的一个周期的时间范围内,则报错,如果 timeout 在正常范围内,则返回对应位置的通道,当该通道超时时,上面的 tick 方法就会将该通道关闭,相应的 goroutine 就可以收到通知了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (w *TimingWheel) After(timeout time.Duration) <-chan struct {} {
if timeout >= w.maxTimeout {
panic("timeout too long")
}

idx := int(timeout / w.interval)
if idx > 0 {
idx--
}

w.Lock()
idx = (w.pos + idx) % len(w.cs)
b := w.cs[idx]
w.Unlock()

return b
}

func (w *TimingWheel) Stop() {
close(w.exit)
}
Pieces of Valuable Programming Knowledges