NSQ 源码学习笔记(一)

339 阅读4分钟
原文链接: www.jianshu.com

首先我们来看一下Nsq的组织结构:

  • nsqd:接收,分发队列信息的守护进程,可以单独部署,也可以集群化运行
  • nsqlookupd:管理nsqd节点,服务发现
  • nsqadmin:nsq的可视化管理工具

NSQ的拓补图

@拓扑图 | center

NSQ中Topic和channel的关系

Topic会将消息发送到每个订阅者(channel)
channel的读消费类似负载均衡,会均匀的投递到各个消费端

@Topic和channel的关系 | center

三个模块中nsqd模块最为重要,我们从这个模块开始学习它的源码

入口函数

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
    _, err := toml.DecodeFile(configFile, &cfg)
    if err != nil {
        log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
    }
}
cfg.Validate()

opts := nsqd.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
    log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
  1. 首先用 signal.Notify 阻塞系统的 killctrl+c 信号,让进程可以处于deamon的状态运行
  2. 按优先级合并配置文件:命令行 > 配置文件 > 默认值
  3. nsqd.LoadMetadata 读取dat文件,加载topic和channel信息,并同步运行和停止的状态
  4. 将进程的运行状态(topic和channel信息)持久化到dat文件中
  5. 执行 nsqd.Main 直到捕捉退出信号

nsqd.Main 的代码位于 nsqd/nsqd.go

NSQD主函数(TCP监听)

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}

    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })
    ...
}

  NSQD首先启动了tcp监听模型,为了保证通用性,在 protocol 包中封装了TCPServer,需要传入 Listener, TCPHandler, Logger 对象。所有的TCP监听均可以用这个模式来创建监听,只要传入对应的 ListenerTCPHandler ,那么Listener在Accept到Connect的时候,将其交给对应TCPHandler.Handle(clientConn) 执行。

TCPHandler 的Interface实现

package protocol

type TCPHandler interface {
    Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }

        // 启动Goroutine 去执行Handle函数
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

  这里体现了Go在实现Interface的便捷之处,不需要显示的声明实现了某个Interface,只需要完全的实现Interface中定义的方法,那么就会默认该类型实现了接口。在这里不同的Handler,只要实现了Handle(net.Conn),就可以被当做TCPHandler对象传入。在代码中的体现是:
  执行Handle函数时是启动一个Goroutine来执行的,这里其实是per connect per goroutine,由于Golang的特性,Goroutine在执行时的调度模式是epoll模式,可以很好的利用系统的多核资源。

main函数中TCPServer的实现

type tcpServer struct {
    ctx *context
}

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

    // 客户端应该初始化本身通过发送一个4字节序列表示协议的版本,
    // 这样将允许我们优雅地升级兼容协议
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx} // V2版本的协议操作
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

  源码中标记了需要在通讯时预留4个字节的版本号信息,用来兼容协议的升级。如果未来有协议升级,只需要在protocolMagic中添加新的case分支就可以了。

NSQD主函数(HTTP/HTTPS监听)

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpsListener = httpsListener
        n.Unlock()
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
        })
    }
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.httpListener = httpListener
    n.Unlock()
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
    })

  这里不论是http还是https的监听,httpsServerhttpServer作为Handler对象,均在内部声明了路由规则,不同的请求定义了不同的操作,最后通过http_api.Serve()绑定端口监听

NSQD默认自启的操作

    n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循环消息分发
    n.waitGroup.Wrap(func() { n.idPump() }) // 生产唯一消息id的一个队列
    n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有变化,同步nsqlookup
    if n.getOpts().StatsdAddress != "" {
        // 定时将nsqd的状态以短连接的方式发送至一个状态监护进程.包括了nsqd的应用资源信息,以及nsqd上topic的信息
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }

  启动监听后,除了通过监听启动的操作外,NSQD还有一些类似守护进程的操作会一直运行,包括:

  • 循环消息分发
  • 生产唯一消息ID
  • nsqlookup的状态同步
  • 状态监控