websocket pub/sub 模型

2,327 阅读4分钟

publish和subscribe模式使用场景还是很多的,记得以前面试的时候有被问到如果要你做一个版本自动升级的,你会怎么做?当时正想在项目中引入etcd,然后考虑了下觉得也可以用publish和subscribe的思路去解决这个问题。

publish是gitlab产生的代码merge到master消息,可以考虑配置webhook来实现,当然用jenkins也没问题,在构建完成之后,jenkins通过curl发送最终的构建结果。

subscribe是各个节点服务器需要安装的一个agent程序,这个agent程序watch自己关注的项目的更新,一旦发现更新之后,拉取最新版本的程序进行更新。

publish可以通过长链接也可以通过http接口来实现,问题不大。subscribe如果要保证实时性,需要通过长链接来实现,如果对实时性要求不高,设定时间间隔轮训调用http接口也完全没问题。

整个功能分成三个模块:

  1. broker: 作为pub和sub的中转
  2. publisher:消息发布者
  3. subscriber:消息订阅者

简单画一个草图:

pub/sub/broker

大致就这么个思路。然后考虑了下,好像整个模型代码写起来也不是很复杂,然后想就动手写起来了。

首先是broker,broker要服务于publisher和subscriber,publisher可以采用标准的htt接口,subscriber可以采用websocket或者自己手写一个长连接,也可以用开源的一些长连接保活的库,像smux是我经常会用到的一个tcp库。

websocket通常情况不用处理编解码的问题,但是还需要添加心跳保活,smux不需要处理心跳保活(库本身提供有心跳机制),但是编解码要自己另外处理。

publisher可以不用实现,提供接口规范就行。

subscriber实现一个长连接的客户端,读到消息就打印消息内容即可(不考虑版本更新这一过程设计)

首先开始设计broker和subscribe的通信协议 proto.go

package proto

const (
	CMD_S2B_HEARTBEAT = iota
	CMD_B2S_HEARTBEAT
	CMD_B2S_MSG
)

type B2SBody S2BBody

type S2BBody struct {
	Cmd  int         `json:"cmd"`
	Data interface{} `json:"data"`
}

type S2BSubscribe struct {
	Topics []string `json:"topics"`
}

type S2BHeartbeat struct{}

type B2SHeartbeat struct{}


ok,然后可以开始写broker代码了,broker和subscriber最终采用websocket来进行, broker和publisher采用http协议。


import (
	"encoding/json"
	"io/ioutil"
	"log"
	"net/http"
	"sync"

	"github.com/ICKelin/pubsub/proto"
	"github.com/gorilla/websocket"
)

type Broker struct {
	addr    string
	muEntry sync.RWMutex
	entry   map[string][]*subscriber
	done    chan struct{}
}

func NewBroker(pubaddr, subaddr string) *Broker {
	return &Broker{
		addr:  pubaddr,
		entry: make(map[string][]*subscriber),
		done:  make(chan struct{}),
	}
}

func (b *Broker) Run() {
	http.HandleFunc("/pub", b.onPublish)
	http.HandleFunc("/sub", b.onSubscriber)
	http.ListenAndServe(b.addr, nil)
}

type pubBody struct {
	Topic string      `json:"topic"`
	Msg   interface{} `json:"msg"`
}

func (b *Broker) onPublish(w http.ResponseWriter, r *http.Request) {
	bytes, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Println(err)
		return
	}

	body := &pubBody{}
	err = json.Unmarshal(bytes, &body)
	if err != nil {
		log.Println(err)
		return
	}

	topic := body.Topic
	msg := body.Msg

	log.Println("publish topic ", topic, msg)

	b.muEntry.RLock()
	subscribers := b.entry[topic]
	b.muEntry.RUnlock()

	if subscribers == nil {
		// drop message once no subscriber
		// TODO: store msg
		return
	}

	for _, s := range subscribers {
		s.push(msg)
	}
}

type subscribeMsg struct {
	Topics []string `json:"topics"`
}

func (b *Broker) onSubscriber(w http.ResponseWriter, r *http.Request) {
	upgrade, err := websocket.Upgrade(w, r, nil, 1024, 1024)
	if err != nil {
		log.Println(err)
		return
	}

	subMsg := proto.S2BSubscribe{}
	upgrade.ReadJSON(&subMsg)

	s := newSubscriber(subMsg.Topics, upgrade.RemoteAddr().String())

	b.muEntry.Lock()
	for _, topic := range s.topics {
		b.entry[topic] = append(b.entry[topic], s)
	}
	b.muEntry.Unlock()

	s.serveSubscriber(upgrade)
	upgrade.Close()

	b.muEntry.Lock()
	for _, topic := range s.topics {
		for i, s := range b.entry[topic] {
			if s.raddr == upgrade.RemoteAddr().String() {
				log.Println("remove subscriber: ", s.raddr, " from topic ", topic)

				if i == len(b.entry[topic])-1 {
					b.entry[topic] = b.entry[topic][:i]
				} else {
					b.entry[topic] = append(b.entry[topic][:i], b.entry[topic][i+1:]...)
				}
				break
			}
		}
	}
	b.muEntry.Unlock()
}

broker的subscriber处理


import (
	"log"
	"time"

	"github.com/ICKelin/pubsub/proto"
	"github.com/gorilla/websocket"
)

type subscriber struct {
	topics   []string
	raddr    string
	done     chan struct{}
	writebuf chan *proto.B2SBody
	// parent string
	// children []*subscriber
}

func newSubscriber(topics []string, raddr string) *subscriber {
	return &subscriber{
		topics:   topics,
		raddr:    raddr,
		done:     make(chan struct{}),
		writebuf: make(chan *proto.B2SBody),
	}
}

func (s *subscriber) serveSubscriber(conn *websocket.Conn) {
	go s.reader(conn)
	s.writer(conn)
}

func (s *subscriber) reader(conn *websocket.Conn) {
	defer close(s.done)
	for {
		var obj proto.S2BBody
		err := conn.ReadJSON(&obj)
		if err != nil {
			log.Println(err)
			break
		}

		switch obj.Cmd {
		case proto.CMD_S2B_HEARTBEAT:
			conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
			conn.WriteJSON(&proto.S2BBody{Cmd: proto.CMD_B2S_HEARTBEAT})
			conn.SetWriteDeadline(time.Time{})
		}
	}
}

func (s *subscriber) writer(conn *websocket.Conn) {
	for {
		select {
		case buf := <-s.writebuf:
			conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
			conn.WriteJSON(buf)
			conn.SetWriteDeadline(time.Time{})
			// drop msg once writejson fail

		case <-s.done:
			return
		}
	}
}

func (s *subscriber) push(data interface{}) {
	s.writebuf <- &proto.B2SBody{Cmd: proto.CMD_B2S_MSG, Data: data}
}

这两段代码还有几个非常明显的问题

  1. 如果没有subscriber,那么这条publish的消息会被丢弃
  2. 如果从broker到subscriber发送的消息超时,消息一样会被丢弃
  3. 发送成功,并不代表对端成功接收到了,如何保证消息真的发布成功

为了解决上面三个问题,可能需要考虑数据持久化以及消息ack等机制,这个也是一大难题。

再来看看subscriber的实现,subscriber可以很简单实现,一个goroutine发心跳包,一个goroutine收数据包,收到的消息之后调用回调函数进行相应的处理。


package main

import (
	"log"
	"time"

	"github.com/ICKelin/pubsub/proto"
	"github.com/gorilla/websocket"
)

type subscriber struct {
	topics []string
	broker string
	cb     func(msg interface{})
}

func newSubscriber(topics []string, broker string, cb func(msg interface{})) *subscriber {
	return &subscriber{
		topics: topics,
		broker: broker,
		cb:     cb,
	}
}

func (s *subscriber) Run() error {
	conn, _, err := websocket.DefaultDialer.Dial(s.broker, nil)
	if err != nil {
		return err
	}

	defer conn.Close()
	subMsg := proto.S2BSubscribe{Topics: s.topics}

	err = conn.WriteJSON(&subMsg)
	if err != nil {
		return err
	}

	go func() {
		for {
			body := &proto.S2BBody{
				Cmd:  proto.CMD_S2B_HEARTBEAT,
				Data: &proto.S2BHeartbeat{},
			}
			conn.WriteJSON(body)
			time.Sleep(time.Second * 3)
		}
	}()

	for {
		var body = proto.S2BBody{}
		conn.ReadJSON(&body)

		switch body.Cmd {
		case proto.CMD_B2S_HEARTBEAT:
			log.Println("hb from broker")

		case proto.CMD_B2S_MSG:
			s.cb(body.Data)
		}
	}
}

func main() {
	s := newSubscriber([]string{"gtun", "https://www.notr.tech"}, "ws://127.0.0.1:10002/sub", func(msg interface{}) {
		log.Println(msg)
	})
	s.Run()
}