Connection Pool

因为 TCP 连接都是客户端主动发起的,也就是说需要经过三次握手才能够进行读写操作,如果客户端需要建立连接的次数较少,那么握手需要的开销倒可以忽略不计,但是如果客户端需要建立成千上万个 TCP 连接,那么就需要成千上万次握手了。如果真是这样,势必会导致系统性能低下,我们引入连接池来解决多次握手的问题。

连接池( Connection Pool )在广义上来说算是资源池,我们之前讲过的 LeakyBuffer 也算是资源池的一种,不过 LeakyBuffer 的目的是反复利用内存资源,减少内存分配的次数。而连接池则是为了减少建立连接的次数,重复利用已有的 TCP 连接。

ConnPool

例如在上图中,Consumer Agent 不断收到来自 ConsumerHTTP 请求,它解析请求的内容后,Consumer Agent 就要利用 TCP 连接给 Provider Agent 发送消息,那么如果按照传统的方法我们就需要每一个 HTTP 请求都要对应一个新的 TCP 连接,也就对应新的三次握手,这样的话系统开销会非常大。你可能会问为什么不直接用一个 TCP 连接来解决传输数据的问题,这样的话甚至不用这么多握手操作了,这问题不错,但是考虑到由于 TCP 传输的是应用层的数据,它并不了解应用层传输的信息究竟是什么意思,如果多个客户端发起的连接共用一个 TCP 连接,那么我们还需要写逻辑去区分收到的消息究竟需要属于哪个 HTTP 请求,另外还需要考虑TCP 发送和接受时读写竞争的问题,操作起来比较混乱,还不如重复利用多个 TCP 连接来发送和接受数据来的干净利落。

总的来说,连接池是一种利用空间换时间的技术。下面我们来看一下如何设计一个连接池。连接池中最核心的操作就是获取一个空闲的 TCP 连接,另外还要考虑连接池资源释放的问题,否则可能会导致内存泄漏。

1
2
3
4
5
6
7
var ErrClosed = errors.New("pool is closed")

type Pool interface {
Get() (net.Conn, error)
Close()
Len() int
}

在正常的 TCP 流程中,Close 会关闭本次连接,底层会进行四个挥手操作,但是我们不希望我们调用 Close 时直接关闭这个连接,而是希望能回收这个连接,所以我们对普通的 net.Conn 做了一层封装,来该改变其 Close 行为(其实这就是装饰器模式的应用),我们想要客户端调用 Close 的时候被我们的连接池所回收而非直接关闭,除非该连接被标记为不可用时 unusable (通过 MarkUnusable ), 我们才决定关闭本次连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type PooledConn struct {
net.Conn
mu sync.RWMutex
c *ConnPool
unusable bool
}

func (p *PooledConn) Close() error {
p.mu.RLock()
defer p.mu.Runlock()

if p.unusable {
if p.Conn != nil {
return p.Conn.Close()
}
return nil
}
return p.c.put(p.Conn)
}

func (p *PooledConn) MarkUnusable() {
p.mu.Lock()
defer p.mu.Unlock()
p.unusable = true
}

func (c *ConnPool) wrapConn(conn net.Conn) net.Conn {
p := &PooledConn{c: c}
p.Conn = conn
return p
}

连接池的构建需要考虑两个问题,一个就是连接创建问题,第二就是回收连接问题,创建我们用一个 Factory 工厂方法来表示,而回收连接可以利用 golang 中的通道机制( chan )来进行储存。

1
2
3
4
5
6
7
type Factory func() (net.Conn, error)

type ConnPool struct {
mu sync.RWMutex
conns chan net.Conn
factroy Factory
}

在初始化连接池时,我们先建立 initialCap 个连接用于刚开始时使用,具体创建可以使用 factory 产生 TCP 连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func NewConnPool(initialCap, maxCap int, factory Factory) (Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil , errors.New("invalid capacity settings")
}
c := &ConnPool{
conns: make(chan net.Conn, maxCap),
factory: factory,
}

for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- conn
}
return c, nil
}

func (c *ConnPool) getConnsAndFactory() (chan net.Conn, Factory){
c.mu.RLock()
defer c.mu.RUnlock()
conns := c.conns
factory := c.factory
return
}

当客户端使用完一次 TCP 连接后,会主动调用 Close 来关闭此次连接,由于我们重新定义了 Close 函数,当调用 Close 时,连接不会直接关闭而是会重新放回连接池中供其他部分代码使用,从而达到重复使用的目的,如果此时连接池已满,则直接释放该连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (c *ConnPool) put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil, rejecting")
}
c.mu.Lock()
defer c.mu.RUnlock()

if c.conns == nil {
return conn.Close()
}

select {
case c.conns <- conn:
return nil
default:
return conn.Close()
}
}

下面的结构体 ConnPool 实现了 Pool 接口。在 Get 方法中我们先得到具体用于存储连接的通道 conns 和用于制造连接的工厂方法 factory,然后试着去通道中拿,和 put 方法类似,如果该通道中还有空闲的 TCP 连接,则直接拿出并经过封装后( wrapConn )使用,如果没有则使用工厂方法新建一个 TCP 连接再经过封装后使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (c *ConnPool) Get() (net.Conn, error) {
conns, factory := c.getConnsAndFactory()
if conns == nil {
return nil, ErrClosed
}

select {
case conn := <- conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn), nil
default:
conn, err := factory()
if err != nil {
return nil, err
}
return c.wrapConn(conn), nil
}
}

func (c *ConnPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns. = nil
c.factory = nil
c.mu.Unlock()

if conns == nil {
return
}
close(conns)
for conn := range conns {
conn.Close()
}
}

func (c *ConnPool) Len() int {
conns, _ := c.getConnsAndFactory()
return len(conns)
}

连接池的使用

由于具体的连接逻辑是由客户端决定的,所以 factory 应该由使用者定义。

1
factory := func() (net.Conn, error) {return net.Dial("tcp", "localhost:8080")}

我们通过 pool.NewConnPool 来新建一个连接池,这里我们指定连接池中初始的连接个数为5,而最大的连接个数为30。通过 p.Get 来获得可用的连接,Get 方法会根据当前池中的情况返回连接,如果池中没有可用的连接则用 factory 方法新建一个,此时系统开销较大,但是这种情况并不常见,所以基本可以忽略不计。如果池中有可用连接,则直接返回,此时开销较小。

1
2
3
4
5
p, err := pool.NewConnPool(5, 30, factory)

conn, err := p.Get()

defer conn.Close()

下面给出真正关闭该连接的方法,因为我们重载了 Close 方法,直接调用 Close 只会将此连接放回连接池中,所以需要在真正关闭前将此连接设为不可用( MarkUnusable )。

1
2
3
4
if pc, ok := conn.(*pool.PooledConn); ok {
pc.MarkUnusable()
pc.Close()
}
Pieces of Valuable Programming Knowledges