Job Dispatcher

当处理大量计算密集型任务时,为了提高工作效率,常常会引入多台服务器同时进行处理,但是不同的服务器之间并不清楚自己该处理哪个任务,也就是说服务器自己并不能保证自己处理的任务和别人的任务是不一样的。为了解决上述问题,我们引入 Job Dispatcher 模型。

我们把处理任务的服务器称为 Worker ,而把分配任务的服务器称为Dispatcher 或者 MasterDispatcher 负责给当前处于空闲状态的 Worker 分配任务 ( Job ),当其处理完分配的任务之后,为了不让自己的 CPU 闲置,Worker 会通知 Dispather 自己是可用的( available ) ,然后 Dispatcher 就可以继续向 Worker 分配任务,直到所有任务全都完成。这样的模型非常 scalable ,引入的 Worker 数量越多,任务完成得越快。

Worker

一个任务我们用结构体 Job 来表示,现在对于此结构体中有什么我们并不关心,这个需要由业务逻辑具体定义。

Worker 中有三个 channel ,分别用于暂停,接收任务和通知 Dispatcher 任务完成用的。

Worker 肯定需要某种机制停止退出,我们用一个 exit chan struct{} 通道来接收停止信号,如果该通道中收到了停止信号,也就可以说明今天的任务已经全部完成了,Worker 可以回家休息了。

Jobs chan Job 通道很简单,当然就是用于接受任务的啦,相对来说,任务可是做不完的,一个任务做完下一个任务就马不停蹄的赶来了,所以用通道来表示可以说是再合适不过了。

最后来看一下最复杂的一个通道 WorkerPool chan chan Job ,这啥玩意啊,通道的通道?想想就头大,外层的通道的中包含了 chan Job ,也就是上面说过的 Jobs 通道,其实就代表 Worker 本身, 如果一个 Job 处理完了, Worker 就会处于空闲状态,所以就可以把它放在 WorkerPool 中等待 Dispatcher 来调度了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Job struct {
Payload string
}

type Worker struct {
WorkerPool chan chan Job
Jobs chan Job
exit chan struct{}
}

func NewWorker(workerPool chan chan Job) *Worker {
return &Worker{
WorkerPool: workerPool,
Jobs: make(chan Job),
exit: make(chan struct{}),
}
}

Stop 方法向 exit 通道发送信号,这样一来 Start 方法中的 select 语句的第二个 case 就会被执行,Worker 就可以退出了。在 Start 方法中,w.WorkerPool <- w.Jobs 会向 WorkerPool 中注册它自己,也就相当于向 Dispatcher 申明自己已经处于空闲状态,随时等待被调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (w *Worker) Start() {
go func() {
for {
w.WorkerPool <- w.Jobs
select {
case job := <- w.Jobs:
// do some heavy job here
case <-w.exit:
return
}
}
}()
}

func (w *Worker) Stop() {
go func() {
w.exit <- struct{}{}
}
}

Dispatcher

所有的任务都是从外界来的(此处我们选择从 JobQueue 中读取),任务到来时并不会直接分配给 Worker ,而是先经过 Dispatcher 的魔爪,Dispatcher 会先判断哪个 Worker 处于空闲状态,然后再分配给空闲的 Worker

1
var JobQueue chan Job

Dispatcher 结构体中的 WorkerPool chan chan Job 是任务调度实现的核心,我们上述 Worker 中的和 Dispatcher 中的其实是一个东西,Worker 中的是对 Dispatcher 中的一个引用而已。

1
2
3
4
5
6
7
8
9
10
11
12
type Dispatcher struct {
WorkerPool chan chan Job
MaxWorkers int
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool: pool,
MaxWorkers: maxWorkers,
}
}

Run 方法中起了 MaxWorkersWorker 等待 Dispatcher 给自己分配任务。

dispatch 方法中,Dispatcher 会不断的从 JobQueue 中读取任务,并在 WorkerPool 中选取一个空闲的 Workerworker := <-d.WorkerPool ),然后将任务分配给它( worker <- job )。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (d *Dispatcher) Run() {
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <- JobQueue:
go func(job Job) {
worker := <-d.WorkerPool
worker <- job
}(job)
}
}
}

Work scheduler

  • Use a buffered channel as a concurrent blocking queue.
  • Use coroutines to let independent concerns run independently
  • Use the race detector for development and even production.
  • Think carefully before introducing unbounded queuing
  • Close a channel to signal that no more values will be sent.
  • Know why and when each communication will proceed.
  • Make sure you know why and when each goroutine will exit.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func Schedule(servers []string, numTask int,
call func(srv string, task int) bool) {
work := make(chan int, numTask)
done := make(chan bool)
exit := make(chan bool)
runTasks := func(srv string) {
for task := range work {
if call(srv, task) {
done <- true
} else {
work <- task
}
}
}
go func() {
for {
select {
case srv := <-servers:
go runTasks(srv)
case <-exit:
return
}
}
}()

for task := 0; task < numTask; task++ {
work <- task
}

for i := 0; i < numTask; i++ {
<-done
}
close(work)
exit <- true
}
Pieces of Valuable Programming Knowledges