Future

Futurejava 中比较常见的并发编程解决方案,它采用异步的方式,尽可能减少运行时代码阻塞,很大程度上提高了程序的运行效率,在golang 中我们可以利用 goroutinechan 机制和 for select 轻松实现类似的功能。

其实 Future 的本质就是异步计算的结果,我们用 item interface{} 来放置 Future 的结果,通过读取 triggered 的值来判断异步计算是否已经返回。

New 方法用于构造 Future 任务,它通过监听 Completer 通道来获取异步执行的结果,如果监听超时,则停止监听,返回错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Future struct {
triggered bool
item interface{}
err error
lock sync.Mutex
wg sync.WaitGroup
}

type Completer <-chan interface{}

func New(completer Completer, timeout time.Duration) *Future {
f := &Future{}
f.wg.Add(1)
go listen(f, completer, timeout)
return f
}

listen 是获得异步计算结果的核心,如果 Completer 通道返回,则通过 set 方法设置计算结果,并标记 triggeredtrue ,如果监听超时,则同样标记 triggeredtrue ,但此时应该返回一个错误而非正确的计算结果。

1
2
3
4
5
6
7
8
9
10
func listen(f *Future, ch Completer, timeout time.Duration) {
t := time.NewTicker(timeout)
select {
case item := <-ch:
f.set(item, nil)
t.Stop()
case <-t.C:
f.set(nil, fmt.Errorf(`timeout after %f seconds`, timeout.Seconds()))
}
}

因为 Future 返回结果的时机并不确定, 所以我们需要一种方式得知结果是否已经返回,IsDone 方法通过查询 triggered 是否为 true 来得知结果是否已经返回 ,也就是查询事件是否已经触发,这里结果返回和发生超时都可以触发事件。

1
2
3
4
5
6
func (f *Future) IsDone() bool {
f.lock.Lock()
done := f.triggered
f.lock.Unlock()
return done
}

所以无论是结果返回,还是超时发生,我们都要通过 set 方法对 Future 进行设置,来标记 Future 事件完成。设置完通过 f.wg.Done() 来通知所有等待此结果的监听者。

1
2
3
4
5
6
7
8
func (f *Future) set(item interface{}, err error) {
f.lock.Lock()
f.triggered = true
f.item = item
f.err = err
f.lock.Unlock()
f.wg.Done()
}

Get 方法用于获得 Future 执行的结果,如果此时 Future 已经触发,那么可以直接返回结果,但是如果此时 triggeredfalse ,也就是说那两件事件中一件都没有发生,此时我们只能傻傻地等待( f.wg.Wait() ) ,直到别人广播( f.wg.Done() )来告知事件已经发生。

1
2
3
4
5
6
7
8
9
10
11
func (f *Future) Get() (interface{}, error) {
f.lock.Lock()
if f.triggered {
f.lock.Unlock()
return f.item, f.err
}
f.lock.Unlock()

f.wg.Wait()
return f.item, f.err
}

下面是应用 Future 的简单例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
completer := make(chan interface{})
f := New(completer, time.Duration(30*time.Minute))
var result interface{}
var err error
var wg sync.WaitGroup
wg.Add(1)
go func() {
result, err = f.Get()
wg.Done()
}()

completer <- `test`
wg.Wait()
Pieces of Valuable Programming Knowledges