Protocol Multiplexer

假设我们要实现一个 RPC 框架并且每个客户端与服务器只维护一根 TCP 连接,但客户端可能会同时发出很多 RPC 请求,服务器收到请求处理后将结果返还给客户端,现在问题来了,由于只有一根 TCP 连接,客户端收到的结果可能是乱序的,我们该如何将请求和答复对应起来呢,如果采取同步的方法倒是不用担心乱序的问题,但是效率肯定非常低,同步导致系统大部分时间在等待 I/O ,为了提高 CPU 的利用率,我们可以采用多路复用的方法。

1
2
3
4
5
6
7
type ProtocolMux interface {
// Init initializes the mux to manage messages to the given service.
Init(Service)
// Call makes a request with the given message and returns the reply.
// Multiple goroutines may call Call concurrently.
Call(Msg) Msg
}

Service 接口仅仅指明了服务需要实现的接口,这些服务实际就是对网络库的封装,可以是 TCP 或者 UDPSendRecv 方法不能在并发条件下使用,并发使用可能会导致数据竞争。

1
2
3
4
5
6
7
8
9
10
11
type Service interface {
// ReadTag returns the muxing identifier in the request or reply message.
// Multiple goroutines may call ReadTag concurrently.
ReadTag(Msg) int64
// Send sends a request message to the remote service.
// Send must not be called concurrently with itself.
Send(Msg)
// Recv waits for and returns a reply mesasge from the remote service.
// Recv must not be called concurrently with itself.
Recv(Msg)
}

实现多路复用的核心就在于使用 pending 这个 map 来存储还未收到答复的请求,每一个请求都有自己独一无二的标签(例如请求的序列号),每个标签对应一个答复,答复用通道 chan Msg 来表示,当收到该标签对应的答复时,该通道返回结果( <-done ),这样就可以保证请求和答复一一对应,而不用关心接收消息先后顺序的问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Mux struct {
srv Service
send chan Msg
mu sync.Mutex
pending map[int64]chan<- Msg
}

func (m *Mux) Init(srv Service) {
m.srv = srv
m.pending = make(map[int64]chan Msg)
go m.sendLoop()
go m.recvLoop()
}

sendLoop 负责消息的发送,而 recvLoop 则会对接受到的答复进行判断标签操作,并将 pending 中这个标签对应的通道取出,然后向这个通道发消息从而告知对应的 Call 函数该请求已经处理完。这两个方法就是复用机制的体现,因为它们底层只使用一个连接来处理多个请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Mux) sendLoop() {
for args := range m.send {
m.srv.Send(args)
}
}

func (m *Mux) recvLoop() {
for {
reply := m.srv.Recv()
tag := m.srv.Tag(reply)

m.mu.Lock()
done := m.pending[tag]
m.mu.Unlock()

if done == nil {
panic("unexpected reply")
}
done <- reply
}
}

对于 Call 方法,整个调用过程是阻塞的,先通过 ReadTag 获取 Msg 的标签,并新建一个通道 done := make(chan Msg, 1) ,作为将来事件完成时的通知。并将该通道存储到 pending 中,通过 m.send <- args 将消息发送到 send 中,sendLoop 会执行真正的发送操作,最后阻塞,等待结果返回( <-done )。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (m *Mux) Call(args Msg) (reply Msg) {
tag := m.srv.ReadTag(args)
done := make(chan Msg, 1)

m.mu.Lock()
if m.pending[tag] != nil {
m.mu.Unlock()
panic("mux: duplicate call tag")
}
m.pending[tag] = done
m.mu.Unlock()

m.send <- args
return <-done
}
Pieces of Valuable Programming Knowledges