当处理大量计算密集型任务时,为了提高工作效率,常常会引入多台服务器同时进行处理,但是不同的服务器之间并不清楚自己该处理哪个任务,也就是说服务器自己并不能保证自己处理的任务和别人的任务是不一样的。为了解决上述问题,我们引入 Job Dispatcher
模型。
我们把处理任务的服务器称为 Worker
,而把分配任务的服务器称为Dispatcher
或者 Master
。Dispatcher
负责给当前处于空闲状态的 Worker
分配任务 ( Job
),当其处理完分配的任务之后,为了不让自己的 CPU
闲置,Worker
会通知 Dispather
自己是可用的( available
) ,然后 Dispatcher
就可以继续向 Worker
分配任务,直到所有任务全都完成。这样的模型非常 scalable
,引入的 Worker
数量越多,任务完成得越快。
Worker
一个任务我们用结构体 Job
来表示,现在对于此结构体中有什么我们并不关心,这个需要由业务逻辑具体定义。
Worker
中有三个 channel
,分别用于暂停,接收任务和通知 Dispatcher
任务完成用的。
Worker
肯定需要某种机制停止退出,我们用一个 exit chan struct{}
通道来接收停止信号,如果该通道中收到了停止信号,也就可以说明今天的任务已经全部完成了,Worker
可以回家休息了。
Jobs chan Job
通道很简单,当然就是用于接受任务的啦,相对来说,任务可是做不完的,一个任务做完下一个任务就马不停蹄的赶来了,所以用通道来表示可以说是再合适不过了。
最后来看一下最复杂的一个通道 WorkerPool chan chan Job
,这啥玩意啊,通道的通道?想想就头大,外层的通道的中包含了 chan Job
,也就是上面说过的 Jobs
通道,其实就代表 Worker
本身, 如果一个 Job
处理完了, Worker
就会处于空闲状态,所以就可以把它放在 WorkerPool
中等待 Dispatcher
来调度了。
1 | type Job struct { |
Stop
方法向 exit
通道发送信号,这样一来 Start
方法中的 select
语句的第二个 case
就会被执行,Worker
就可以退出了。在 Start
方法中,w.WorkerPool <- w.Jobs
会向 WorkerPool
中注册它自己,也就相当于向 Dispatcher
申明自己已经处于空闲状态,随时等待被调度。
1 | func (w *Worker) Start() { |
Dispatcher
所有的任务都是从外界来的(此处我们选择从 JobQueue
中读取),任务到来时并不会直接分配给 Worker
,而是先经过 Dispatcher
的魔爪,Dispatcher
会先判断哪个 Worker
处于空闲状态,然后再分配给空闲的 Worker
。
1 | var JobQueue chan Job |
Dispatcher
结构体中的 WorkerPool chan chan Job
是任务调度实现的核心,我们上述 Worker
中的和 Dispatcher
中的其实是一个东西,Worker
中的是对 Dispatcher
中的一个引用而已。
1 | type Dispatcher struct { |
Run
方法中起了 MaxWorkers
个 Worker
等待 Dispatcher
给自己分配任务。
在 dispatch
方法中,Dispatcher
会不断的从 JobQueue
中读取任务,并在 WorkerPool
中选取一个空闲的 Worker
( worker := <-d.WorkerPool
),然后将任务分配给它( worker <- job
)。
1 | func (d *Dispatcher) Run() { |
Work scheduler
- Use a buffered channel as a concurrent blocking queue.
- Use coroutines to let independent concerns run independently
- Use the race detector for development and even production.
- Think carefully before introducing unbounded queuing
- Close a channel to signal that no more values will be sent.
- Know why and when each communication will proceed.
- Make sure you know why and when each goroutine will exit.
1 | func Schedule(servers []string, numTask int, |