funcFanin(cs ...<-chan *Message) <-chan *Message { var wg sync.WaitGroup out := make(chan *Message) send := func(c <-chan *Message) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for c := range cs { go send(c) } gofunc() { wg.Wait() close(out) }() return out }
func(fan *MessageFan)AddFaninChannel(channel <-chan *Message) { fan.lock.Lock() defer fan.lock.Unlock() for _, c := range fan.in { if c == channel { fmt.Println("Received duplicate connection") return } } fan.ins = append(fan.ins, channel) gofunc() { for msg := range channel { out <- msg } fan.lock.Lock() defer fan.lock.Unlock() for i, c := range fan.ins { if c == channel { fan.ins = append(fan.ins[:i], fan.ins[i+1:]...) } } }() }
这里我们定义了一个 MessageFan 结构体,里面包含了一个输入通道的数组和一个输出通道,在实例化 MessageFan 的时候,注意,我们仅仅对 out进行了 make 操作,因为 out 通道是确定的,只有一个,而我们的输入通道却是 runtime 时决定的,所以无法使用 make 操作。当我们调用 AddFaninChannel 方法的时候,我们先遍历 ins 这个通道数组,看有没有相同的实现已经加入过的通道,有则返回,没有则将它加入 ins ,同样的,在 goroutine 中轮询这个 channel ,将它的消息塞入 out 中,注意,在一般情况下该for循环是阻塞的,如果for循环返回,则说明 channel 已经被关闭了,即在代码的某处执行了 close(channel) 这个操作,所以说明它已经没有作用了,我们就可以把它在我们的ins数组中删除。