Go里面如何实现广播?

5,851 阅读3分钟

在Go里面,channel是一种非常关键、方便的通信机制,但是通常我们想要将消息传递给多个消费者可能需要建立多个通道。只要channel的某个消息被某个消费者读取了,那么该值就会被移除,而其他消费者将不会再消费这个消息。如果我们想实现广播方式我们可能需要将消息写入N个channel。例如:

func main() {
	ch := make(chan int)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		for v := range ch {
			fmt.Println("A", v)
		}
		wg.Done()
	}()
	go func() {
		for v := range ch {
			fmt.Println("B", v)
		}
		wg.Done()
	}()
	ch <- 1
	ch <- 2
	close(ch)
	wg.Wait()
}

运行结果:

B 1
A 2

或者是:

B 1
B 2

或者是:

A 2
B 1

或者是:

A 1 
A 2

从上面可以看出输出结果不定,每个goroutine消费者只能消费到一条消息。如果我们希望所有消费者都能收到每个消息,我们需要更多的频道。例如:

package main
import (
	"fmt"
	"sync"
)
func main() {
	chA := make(chan int)
	chB := make(chan int)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		for v := range chA {
			fmt.Println("A", v)
		}
		wg.Done()
	}()
	go func() {
		for v := range chB {
			fmt.Println("B", v)
		}
		wg.Done()
	}()
	for i := 0; i < 10; i++ {
		chA <- i
		chB <- i
	}
	close(chA)
	close(chB)
	wg.Wait()
}

现在每个消费者接收到了每一个消息。我们对此并不满意。现在我们必须跟要维护多个channel,如果我们想要动态添加或删除消费者怎么办?让我们看看第一个中间改进。

package main
import (
	"fmt"
	"sync"
)
func main() {
	chA := make(chan int)
	chB := make(chan int)
	chBroadcast := make(chan int)
	var wg sync.WaitGroup
	wg.Add(3)
	go func() {
		for v := range chBroadcast {
			chA <- v
			chB <- v
		}
		close(chA)
		close(chB)
		wg.Done()
	}()
	go func() {
		for v := range chA {
			fmt.Println("A", v)
		}
		wg.Done()
	}()
	go func() {
		for v := range chB {
			fmt.Println("B", v)
		}
		wg.Done()
	}()
	for i := 0; i < 10; i++ {
		chBroadcast <- i
	}
	close(chBroadcast)
	wg.Wait()
}

请注意区别?现在我们只需要将我们的值写入一个channel chBroadcast。这是我们的广播channel。但现在让动态添加和删除消费者变得更复杂.下面进行了一次封装,封装之后消费者的添加和删除就变得比较容易了。

type BroadcastService struct {
	// 这是消费者监听的通道
	chBroadcast chan int
	// 转发给这些channel
	chListeners []chan int
	// 添加消费者
	chNewRequests chan (chan int)
	// 移除消费者
	chRemoveRequests chan (chan int)
}
// 创建一个广播服务
func NewBroadcastService() *BroadcastService {
	return &BroadcastService{
		chBroadcast:      make(chan int),
		chListeners:      make([]chan int, 3),
		chNewRequests:    make(chan (chan int)),
		chRemoveRequests: make(chan (chan int)),
	}
}
// 这会创建一个新消费者并返回一个监听通道
func (bs *BroadcastService) Listener() chan int {
	ch := make(chan int)
	bs.chNewRequests <- ch
	return ch
}
// 移除一个消费者
func (bs *BroadcastService) RemoveListener(ch chan int) {
	bs.chRemoveRequests <- ch
}
func (bs *BroadcastService) addListener(ch chan int) {
	for i, v := range bs.chListeners {
		if v == nil {
			bs.chListeners[i] = ch
			return
		}
	}
	bs.chListeners = append(bs.chListeners, ch)
}
func (bs *BroadcastService) removeListener(ch chan int) {
	for i, v := range bs.chListeners {
		if v == ch {
			bs.chListeners[i] = nil
			// 一定要关闭! 否则监听它的groutine将会一直block
			close(ch)
			return
		}
	}
}

func (bs *BroadcastService) Run() chan int {
	go func() {
		for {
			// 处理新建消费者或者移除消费者
			select {
			case newCh := <-bs.chNewRequests:
				bs.addListener(newCh)
			case removeCh := <-bs.chRemoveRequests:
				bs.removeListener(removeCh)
			case v, ok := <-bs.chBroadcast:
				// 如果广播通道关闭,则关闭掉所有的消费者通道
				if !ok {
					goto terminate
				}
				// 将值转发到所有的消费者channel
				for _, dstCh := range bs.chListeners {
					if dstCh == nil {
						continue
					}
					dstCh <- v
				}
			}
		}
	terminate:
		//关闭所有的消费通道
		for _, dstCh := range bs.chListeners {
			if dstCh == nil {
				continue
			}
			close(dstCh)

		}
	}()
	return bs.chBroadcast
}

func main() {
	bs := NewBroadcastService()
	chBroadcast := bs.Run()
	chA := bs.Listener()
	chB := bs.Listener()
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		for v := range chA {
			fmt.Println("A", v)
		}
		wg.Done()
	}()
	go func() {
		for v := range chB {
			fmt.Println("B", v)
		}
		wg.Done()
	}()
	for i := 0; i < 3; i++ {
		chBroadcast <- i
	}
	bs.RemoveListener(chA)
	for i := 3; i < 6; i++ {
		chBroadcast <- i
	}
	close(chBroadcast)
	wg.Wait()
}

原文:science.mroman.ch/gobroadcast…