Heartbeat

在一个分布式集群中,个别节点出现宕机的情况并不少见,例如在一个只有一个主节点,和多个从节点的集群中,从节点发生宕机对整个系统影响倒不算太大,如果主节点宕机,系统将不能正常运行,所以必须有一个机制检测各个节点是否正常运行。

心跳机制是设计高可用性分布式系统的重要技术。心跳机制通过周期性地向其他节点发送心跳消息并等待确认来检测集群中的节点状态。当节点状态发生变化时,会通知发送心跳检测消息的节点。如果发送心跳检测消息的节点在一段时间内未收到确认,则该节点将被视为失败。这是心跳机制的基本原理。

在设计心跳时,需要考虑心跳是单向,还是双向的,单向的心跳可以保证当发送方出现问题时,接收方在一定的时间内可以检测到,而双向则可以保证双方都可以检测到对方是否出现问题。

下面我们来实现一个单向的心跳检测机制,这里存在一个主节点和多个从节点,主节点会向连接到自己的从节点定期发送心跳消息,如果从节点在规定时间内没有收到来自主节点的心跳消息,则认为主节点发生异常,然后进行相应的超时处理流程(如自己参与选举成为主节点等)。

Master

在主节点中我们维护连接到主节点的从节点的信息,使用用一个数组保存,当节点加入或者退出系统时候,分别调用 AddDel 方法进行增删。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Master struct {
sync.RWMutex
n int
slaves []*Slave
}

func (m *Master) Add(s *Slave) {
s.Lock()
defer s.Unlock()
s.n++
s.slaves = append(s.slaves, s)
}

func (m *Master) Del(s *Slave) {
s.Lock()
defer s.Unlock()
n := make([]*Slave, len(s.slaves)-1)
for _, x := range m.slaves {
if x != s {
n = append(n, x)
}
}
s.n--
s.slaves = n
}

一旦从节点与主节点建立连接,主节点会通过 heartbeatLoop 向从节点定时发送心跳消息。

1
2
3
4
5
6
7
8
9
func (m *Master) handleConn(conn net.Conn) {
s := &slave{
conn: conn,
connectTime: time.Now()
}
m.Add(s)
log.Println("Connected: ", conn.RemoteAddr)
go s.heartbeatLoop()
}

Slave

slave 是主节点维护的从节点的信息,connectTime 表示从节点第一次连接的时间,exit chan struct{} 用于接收退出信号,例如当从节点退出该集群时或者从节点发生宕机时,通过该机制停止向从节点发送心跳信息。

1
2
3
4
5
6
type slave struct {
sync.RWMutex
conn net.Conn
connectTime time.Time
exit chan struct{}
}

hearbeatLoop 方法中,通过 time.NewTicker 来设置一个定时器,每隔一秒钟向从节点发送心跳消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *slave) heartbeatLoop() {
ticker := time.NewTicker(1 * time.Second)
defer func() {
s.conn.Close()
ticker.Stop()
}()
for {
select {
case t := <-ticker.C:
s.Lock()
c.conn.Write([]byte(fmt.Sprintf("heartbeat %d", time.Now())))
s.Unlock()
case <-s.exit:
return
}
}
}

func (s *slave) stop() {
s.exit <- struct{}{}
}

Main Routine

master main

主节点通过监听相应的端口,处理与自己连接的请求,对于每个连接请求,都会起一个新的 goroutine 分别处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
l, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
defer l.Close()
m := &Master{}
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go m.handleConnection(conn)
}
}

slave main

从节点会接收来自主节点的心跳消息,并通过 time.NewTimer 来设置一个定时器,如果在规定的时间内没有收到来自主节点的消息,则说明心跳超时,怀疑主节点出现异常,如果在规定的时间内收到心跳消息,则通过timer.Reset 来重置定时器。一般来说,心跳超时的时间是心跳发送周期的3倍左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func main() {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
log.Fatal(err)
}

timer := time.NewTimer(3 * time.Second)

msgChan := make(chan string, 1)

go func(conn net.Conn) {
for {
b := make([]byte, 100)
n, err := conn.Read(b)
if err != nil {
panic(err)
}
msgChan <- string(b[:n])
}
}(conn)

for {
select {
case m := <-msgChan:
fmt.Println(m)
case <-timer.C:
fmt.Println("Timed out")
// do something here
<-msgChan
}
timer.Reset(3 * time.Second)
}
}

Randomized Timeout

上面使用的计时器的时间间隔都是相同的,但是在很多工程项目例如 Raft 中,常常使用 Randomized Timeout 这种奇技淫巧来减少 split votes 情况的发生。使用随机超时我们需要自己定义一个 var timeout <-chan time.Time ,每次重置计时器时(例如收到心跳消息或者计时器超时),先将相应的 timeout 设置为 nil ,然后获取一个随机的时间间隔,并使用 time.After 来获得一个新的计时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
rand.Seed(time.Now().UTC().UnixNano())
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
appendEntries := make(chan struct{})
go func() {
for range ticker.C {
appendEntries <- struct{}{}
}
}()

var timeout <-chan time.Time
for {
if timeout == nil {
timeout = time.After(time.Duration(r(150, 300)) * time.Millisecond)
}
select {
case m := <-appendEntries:
timeout = nil
fmt.Println("received a message")
// process(m)
case <-timeout:
timeout = nil
fmt.Println("convert to candidate")
// do something such as election
}
}
}

func r(min, max int) int {
return rand.Intn(max-min) + min
}
Pieces of Valuable Programming Knowledges