Single Flight

很多人闲来无事就喜欢刷票圈逛微博,所有的消息看完一遍之后,就开启疯狂下拉刷新模式,希望能第一时间看到什么劲爆的消息。对于应用的使用者,这倒没什么,但是如果连续多次下拉刷新操作都分别对应一次服务器请求的话,那么服务器估计就得扑街了,所以我们需要限制在一定时间内同一个请求的数量,以避免无用的请求。

我们需要实现一种机制,对于特定的请求,在该请求触发之后但还没有返回结果之前,对于之后所有相同的请求,无论多少个,都不会真正触发,而是等待第一个请求返回,然后共用一个结果。

SingleFlight 就可以帮助我们实现上述的要求,我们使用结构体 call 来表示特定请求返回的结果,在 SingleFlight 结构体中使用一个 map 来存储不同的请求结果,也就是一个请求对应一个call,并用锁保护 map 的读写。

1
2
3
4
5
6
7
8
9
10
type SingleFlight struct {
sync.Mutex
m map[string]*call
}

type call struct {
wg sync.WaitGroup
val interface{}
err error
}

一般来说,call 是一些所需时间长,消耗资源多的操作的返回结果(例如HTTP 请求)。Do 方法就保证对于同一个请求,在结果未返回之前,后面相同的请求都不会真正进行,而是等待第一个请求的结果。在函数运行时需要检测当前 map 是否为空,如果为空则 make 一个,然后检查是否有相同的请求正在进行,如果有则使用 c.wg.Wait 来等待执行结果。要是这个请求是该时间段内第一个请求,则将它加入 map 中,然后通过 c.wg.Add 来增加 WaitGroup 应该等待的 goroutine 个数,当函数执行完成之后,通过 c.wg.Done 来广播告知他人,此时任何调用c.wg.Wait 的地方都会解除阻塞,并返回该结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (sf *SingleFlight) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
sf.Lock()

if sf.m == nil {
sf.m = make(map[string]*call)
}

if c, ok := sf.m[key]; ok {
sf.Unlock()
c.wg.Wait()
return c.val, c.err
}

c := new(call)
c.wg.Add(1)
sf.m[key] = c
sf.Unlock()

c.val, c.err = fn()
c.wg.Done()

sf.Lock()
delete(sf.m, key)
sf.Unlock()

return c.val, c.err
}

WaitGroup

SingleFlight 机制主要是靠 WaitGroup 实现,一般来说,因为main routine 不会等待 goroutine 执行完成,所以需要某种机制等待所有的goroutine 执行完成,使用 WaitGroup 就可以帮助我们达到这样的目的。

WaitGroup 的本质就是一个计数器,当每起一个新的 goroutine 时,我们调用 wg.Add(1) 来增加计数器的数量,当该 goroutine 执行完毕时,利用 wg.Done() 可以将计数器减一,调用 Wait() 方法的地方会一直阻塞 main routine ,只有当计数器为0时,wg 才会解除阻塞,此时程序才可以继续执行下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
type WaitGroup struct {
noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state.
state1 [12]byte
sema uint32
}
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
Pieces of Valuable Programming Knowledges