假设我们要实现一个 RPC
框架并且每个客户端与服务器只维护一根 TCP
连接,但客户端可能会同时发出很多 RPC
请求,服务器收到请求处理后将结果返还给客户端,现在问题来了,由于只有一根 TCP
连接,客户端收到的结果可能是乱序的,我们该如何将请求和答复对应起来呢,如果采取同步的方法倒是不用担心乱序的问题,但是效率肯定非常低,同步导致系统大部分时间在等待 I/O
,为了提高 CPU
的利用率,我们可以采用多路复用的方法。
1 | type ProtocolMux interface { |
Service
接口仅仅指明了服务需要实现的接口,这些服务实际就是对网络库的封装,可以是 TCP
或者 UDP
,Send
和 Recv
方法不能在并发条件下使用,并发使用可能会导致数据竞争。
1 | type Service interface { |
实现多路复用的核心就在于使用 pending
这个 map
来存储还未收到答复的请求,每一个请求都有自己独一无二的标签(例如请求的序列号),每个标签对应一个答复,答复用通道 chan Msg
来表示,当收到该标签对应的答复时,该通道返回结果( <-done
),这样就可以保证请求和答复一一对应,而不用关心接收消息先后顺序的问题了。
1 | type Mux struct { |
sendLoop
负责消息的发送,而 recvLoop
则会对接受到的答复进行判断标签操作,并将 pending
中这个标签对应的通道取出,然后向这个通道发消息从而告知对应的 Call
函数该请求已经处理完。这两个方法就是复用机制的体现,因为它们底层只使用一个连接来处理多个请求。
1 | func (m *Mux) sendLoop() { |
对于 Call
方法,整个调用过程是阻塞的,先通过 ReadTag
获取 Msg
的标签,并新建一个通道 done := make(chan Msg, 1)
,作为将来事件完成时的通知。并将该通道存储到 pending
中,通过 m.send <- args
将消息发送到 send
中,sendLoop
会执行真正的发送操作,最后阻塞,等待结果返回( <-done
)。
1 | func (m *Mux) Call(args Msg) (reply Msg) { |