PubSub
Posted on
May 6, 2018
消息的发布和订阅这种设计模式在大型开源项目中还是会经常被使用到的,它的要点是一堆消息订阅者( Subscriber
)会订阅某个自己感兴趣的 topic
,一旦发布者( Publisher
)向这个 topic
发布消息,消息的订阅者就能收到该消息。
这种设计模式的灵感其实就来自于现实生活之中,我们( Subscriber
)向我们感兴趣的杂志社订阅想要阅读的月刊,那么我们订阅的消息(算一个 subscription
)就会被杂志社( Publisher
)记录下来,一旦杂志可以上市发行了,杂志社就会主动向我们推送( Publish
)杂志消息。
下面我们来实现一个简单的发布者订阅者模型吧。
由于这个模型的核心就在于很多 subscriber
会向某一个 topic
订阅消息,所以需要一个结构体来记录特定的 topic
对应的所有 subscriber
,这里 subscriber
用一个集合( Set
)来表示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type PubSub struct { sync.RWMutex subscriptions map [string ]*Set } type Set struct { items map [interface {}]struct {} lock *sync.RWMutex } func NewPubSub () *PubSub { return &PubSub{ subscriptions: make (map [string ]*Set), } }
Subscription
应该具有监听行为,通过 Listen
来获取 Publisher
发布的消息。而结构体 subscription
则相当于一个订阅的注册信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type Subscription interface { Listen() (interface {}, error) } type subscription struct { topic string ttl time.Duration c chan interface {} } type (s *subscription) Listen() (interface {}, error) { select { case <-time.After(s.ttl): return nil , errors.New("timed out" ) case iterm := <-s.c: return item, nil } }
其实订阅和取消订阅的本质就是将自己从订阅名单( subscriptions map[string]*Set
)里面进行增加或删除的操作,因为我们向杂志社订阅了某本杂志,那么关于我们的订阅信息一定会在杂志出版商那里存在。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (ps *PubSub) Subscribe (topic string , ttl time.Duration) Subscription { sub := &subscription{ topic: topic. ttl: ttl, c: make (chan interface {}, subscriptionBuffSize), } ps.Lock() s, exists := ps.subscriptions[topic] if !exists { s = NewSet() ps.subscriptions[topic] = s } ps.Unlock() s.Add(sub) time.AfterFunc(ttl, func () { ps.unSubscribe(sub) }) return } func (ps *PubSub) unSubscribe (sub *subscription) { ps.Lock() defer ps.Unlock() ps.subscriptions[sub.topic].Remove(sub) if ps.subscriptions[sub.topic].Size() != 0 { return } delete (ps.subscriptions, sub.topic) }
最后我们订阅的杂志终于要发布了,杂志的出版社会根据自己手中的订阅名单,逐一向订阅该 topic
的订阅者发送杂志,当然只有订阅了的人才能收到。但是由于订阅者可能出去旅游了,也就是说他最近并没有调用 Listen
将收到的订阅消息拿出,而我们接受订阅消息的邮箱大小是有上限的( c: make(chan interface{}, subscriptionBuffSize)
,我们设置了subscriptionBuffSize
来表示一个通道中最多可以存放的消息数量),如果该邮箱内已有的消息超出这个限制,则选择不将消息放入邮箱中,否则则将消息放入这个邮箱中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (ps *PubSub) Publish (topic string , item interface {}) error { ps.RLock() defer ps.RUnlock() s, subscribed := ps.subscriptions[topic] if !subscribed { return errors.New("no subscribers" ) } for _, sub := range s.ToArray() { c := sub.(*subscription).c if len (c) == subscriptionBuffSize { continue } c <- item } return nil }
Publish/subscribe server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 type Server struct { publish chan Event subscribe chan subReq cancel chan subaReq } type subReq struct { c chan <- Event ok chan bool } func (s *Server) Init () { s.publish = make (chan Event) s.subscribe = make (chan subReq) s.cancel = make (chan subReq) go s.loop() } func (s *Server) Publish (e Event) { s.publish <- e } func (s *Server) Subscribe (c chan <- Event) { r := subReq{c: c, ok: make (chan bool )} s.subscribe <- r if !<-r.ok { panic ("pubsub: already subscribed" ) } } func (s *Server) Cancel (c chan <- Event) { r := subReq{c: c, ok: make (chan bool )} s.cancel <- r if !<-r.ok { panic ("pubsub: not subscribed" ) } } func (s *Server) loop () { sub := make (map [chan <- Event]bool ) for { select { case e := <-s.publish: for c := range sub { c <- e } case r := <-s.subscribe: if sub[r.c] { r.ok <- false break } sub[r.c] = true r.ok <- true case r := <-s.cancel: if !sub[r.c] { r.ok <- false break } close (r.c) delete (sub, r.c) r.ok <- true } } }