假设我们要实现一个 RPC 框架并且每个客户端与服务器只维护一根 TCP 连接,但客户端可能会同时发出很多 RPC 请求,服务器收到请求处理后将结果返还给客户端,现在问题来了,由于只有一根 TCP 连接,客户端收到的结果可能是乱序的,我们该如何将请求和答复对应起来呢,如果采取同步的方法倒是不用担心乱序的问题,但是效率肯定非常低,同步导致系统大部分时间在等待 I/O ,为了提高 CPU 的利用率,我们可以采用多路复用的方法。
1  | type ProtocolMux interface {  | 
Service 接口仅仅指明了服务需要实现的接口,这些服务实际就是对网络库的封装,可以是 TCP 或者 UDP ,Send 和 Recv 方法不能在并发条件下使用,并发使用可能会导致数据竞争。
1  | type Service interface {  | 
实现多路复用的核心就在于使用 pending 这个 map 来存储还未收到答复的请求,每一个请求都有自己独一无二的标签(例如请求的序列号),每个标签对应一个答复,答复用通道 chan Msg 来表示,当收到该标签对应的答复时,该通道返回结果( <-done ),这样就可以保证请求和答复一一对应,而不用关心接收消息先后顺序的问题了。
1  | type Mux struct {  | 
sendLoop 负责消息的发送,而 recvLoop 则会对接受到的答复进行判断标签操作,并将 pending 中这个标签对应的通道取出,然后向这个通道发消息从而告知对应的 Call 函数该请求已经处理完。这两个方法就是复用机制的体现,因为它们底层只使用一个连接来处理多个请求。
1  | func (m *Mux) sendLoop() {  | 
对于 Call 方法,整个调用过程是阻塞的,先通过 ReadTag 获取 Msg 的标签,并新建一个通道 done := make(chan Msg, 1) ,作为将来事件完成时的通知。并将该通道存储到 pending 中,通过 m.send <- args 将消息发送到 send 中,sendLoop 会执行真正的发送操作,最后阻塞,等待结果返回( <-done )。
1  | func (m *Mux) Call(args Msg) (reply Msg) {  |