阅读 186

Nsq的二三事

Nsq是什么

A realtime distributed messaging platform
复制代码

特征:

  • 分布式的

NSQ提倡没有单点故障的分布式和分散拓扑,支持容错和高可用性,并提供可靠的消息传递保证

  • 可扩展

NSQ水平扩展,没有任何集中的代理。内置的发现功能简化了向集群添加节点的工作。支持发布-订阅和负载平衡的消息传递。它也很快。

  • 操作友好

NSQ易于配置和部署,并附带一个管理UI。二进制文件没有运行时依赖性,我们为linux、darwin、freebsd和windows以及官方Docker映像提供预编译版本。

  • 完整的

官方的Go和Python库以及大多数主要语言的社区支持的库都是可用的(参见客户端库)。

其他特征罗列如下:

  • 基于低延迟推送的消息传递(性能)
  • 负载平衡和多播风格的消息路由组合
  • 在流(高吞吐量)和面向工作(低吞吐量)的工作负载方面表现出色
  • 主要在内存中(超出高水位标记的消息透明地保存在磁盘上)
  • 供使用者查找生产者的运行时发现服务(nsqlookupd)
  • 传输层安全性(TLS)
  • 不可知的数据格式
  • 很少的依赖项(易于部署)和合理,有界的默认配置
  • 支持任何语言的客户端库的简单TCP协议
  • 统计信息,管理操作和生产者的HTTP接口(无需发布客户端库)
  • 与statsd集成以进行实时检测
  • 强大的集群管理界面(nsqadmin)

保证

与任何分布式系统一样,要实现目标,就需要进行明智的权衡。通过透明地了解这些折衷的现实,我们希望对NSQ在生产中部署时的行为设定期望。

  • 消息不持久(默认情况下)

尽管系统支持一个--mem-queue-size选项,设置之后,消息可以透明的保存到磁盘上,但是,它主要还是一个内存中的消息传递平台。

可以将--mem-queue-size设置为0,以确保所有传入消息都持久保存到磁盘。 在这种情况下,如果节点发生故障,那么您的故障面就会减少

没有内置的复制。 但是,可以通过多种方式来管理此折衷方案,例如部署拓扑和以容错方式主动将主题从属并将其持久化到磁盘的技术。

  • 消息至少传递一次

与上面的内容密切相关,这假定给定的nsqd节点不会失败。

这意味着,由于多种原因,可以多次传递消息(客户端超时,断开连接,重新排队等)。 客户有责任执行幂等操作或重复数据删除。

  • 收到的消息是无序的

您不能依靠传递给使用者的消息顺序。

与消息传递语义类似,这是重新排队,内存中存储和磁盘存储组合以及每个nsqd节点不共享任何事实的结果。

通过在您的使用者中引入一个等待时间窗口以接受消息并在处理之前对其进行排序,可以轻松地实现松散的排序(即,对于给定的使用者,其消息是有序的,而不是整个集群)。 保留此不变变量,必须丢弃该窗口之外的消息)。

  • 消费者最终会找到所有的主题生产者

发现服务(nsqlookupd)旨在最终保持一致。 nsqlookupd节点不协调维护状态或回答查询。

网络分区不会影响可用性,因为分区的两侧仍然可以回答查询。 部署拓扑对缓解这些类型的问题具有最重要的作用。

Nsq的组件

NSQ由3个守护程序组成:

  • nsqd是接收,排队并将消息传递到客户端的守护程序。它可以独立运行,但通常在具有nsqlookupd实例的群集中配置(在这种情况下,它将宣布主题和发现通道)。它监听两个TCP端口,一个用于客户机,另一个用于HTTP API。它可以选择监听HTTPS的第三个端口。
  • nsqlookupd是管理拓扑信息并提供最终一致的发现服务的守护程序。 客户端查询nsqlookupd以发现特定主题的nsqd生产者,并且nsqd节点广播主题和频道信息。有两个接口:nsqd用于广播的TCP接口和客户机用于执行发现和管理操作的HTTP接口。
  • nsqadmin是一个Web UI,用于实时内省群集(并执行各种管理任务)。

核心概念

Topics和Channels

主题和渠道是NSQ的核心原语,最能说明系统设计如何无缝转换为Go的功能。go中的channels是表示队列的自然方式,因此一个NSQ主题/通道,就像带有缓存的go channel. 缓冲区的大小等于--mem-queue-size配置参数。

在线读取数据后,将消息发布到主题的操作涉及:

  • 消息结构的实例化(以及消息正文[] byte的分配)
  • 读取锁以获取主题
  • 读取锁以检查发布能力
  • 发送到带缓冲的go-chan

为了让消息从一个主题放到它的channel中不能依赖于典型的go channel接收原语。因为多个goroutines从一个go channel中接收消息会分发消息,而期望的最终结果是将每个消息复制到每个通道(goroutine)。

相反,每个主题都维护3个主要的goroutine。 第一个称为router,负责从传入的go-chan中读取新发布的消息并将其存储在队列(内存或磁盘)中。

第二个称为messagePump,负责如上所述将消息复制并推送到通道。

第三个负责DiskQueue IO,将在后面讨论。

通道稍微复杂一些,但共有一个基本目标,即公开单个输入和单个输出go-chan(以抽象出内部消息可能在内存或磁盘中的事实):

nsq channel.png

此外,每个通道维护2个按时间顺序排列的优先级队列,这些队列负责延迟和进行中的消息超时(以及2个随附的goroutines进行监视)。

通过管理每个通道的数据结构,而不是依赖Go运行时的全局计时器调度程序,可以改善并行化。

注意:在内部,Go运行时使用单个优先级队列和goroutine来管理计时器。 这支持(但不限于)整个time包。 通常,它不需要用户级按时间顺序排列的优先级队列,但请务必记住,它是具有单个锁的单个数据结构,可能会影响GOMAXPROCS> 1的性能。 请参阅runtime/time.go

Backend和DiskQueue

NSQ的设计目标之一是限制内存中的消息数量。它通过DiskQueue(主题或通道的第三个主goroutine)透明地将消息溢出写入磁盘来实现这一点。

因为内存队列只是一个go channel,首先,将消息路由到内存很简单,如果可能,然后回写到磁盘:

for msg := range c.incomingMsgChan {
	select {
	case c.memoryMsgChan <- msg:
	default:
		err := WriteMessageToBackend(&msgBuf, msg, c.backend)
		if err != nil {
			// ... handle errors ...
		}
	}
}
复制代码

利用Go的select语句可以用几行代码来表达这个功能:上面的default只在memoryMsgChan满的时候执行。 Nsq还有短暂topic/channel到概念,他们在消息溢出时,直接丢弃他们(而不是写入到磁盘)当他们不再有客户端订阅的时候。这是go 接口的很好的一个使用案例,主题和渠道的struct成员声明为Backend接口,而不是具体类型。普通主题和频道使用DiskQueue,而临时主题和频道则在DummyBackendQueue中使用存根,而DummyBackendQueue实现了无操作Backend

设计原理

简化配置和管理

单个nsqd实例旨在一次处理多个数据流。 流称为“主题”,一个主题具有1个或多个“频道”。 每个频道都会收到一个主题的所有消息的副本。 实际上,频道映射到消耗主题的下游服务。

主题和频道没有事先配置。 通过发布到命名主题或订阅命名主题上的频道,可以在首次使用时创建主题。 通过订阅指定的频道来在首次使用时创建频道。

主题和频道都相互独立地缓冲数据,从而防止缓慢的使用者造成其他频道的积压(在主题级别也是如此)。

一个通道通常可以连接多个客户端。 假设所有连接的客户端都处于准备接收消息的状态,则每条消息都将传递给随机客户端。 例如:

nsqd.gif

总而言之,消息是从主题->通道多播的(每个通道都接收该主题的所有消息的副本),但从通道->使用者均匀分发(每个消费者都接收该通道的一部分消息)。

NSQ还包括一个辅助应用程序nsqlookupd,该应用程序提供了目录服务,消费者可以在其中查找nsqd实例的地址,这些地址提供了他们感兴趣的订阅主题。 在配置方面,这使消费者与生产者分离(他们两个都只需要知道在哪里联系nsqlookupd的普通实例,而彼此之间就不需要),从而降低了复杂性和维护。

在较低级别上,每个nsqd都具有到nsqlookupd的长期TCP连接,并通过该连接定期推送其状态。 此数据用于通知nsqlookupd将给消费者哪些nsqd地址。 对于使用者,将公开HTTP /lookup端点以进行轮询。

要引入主题的新的不同消费者,只需启动一个NSQ客户端,该客户端使用nsqlookupd实例的地址配置。不需要更改配置就可以添加新的使用者或新的发布者,从而大大降低了开销和复杂性。

注意:在将来的版本中,启发式nsqlookupd用于返回地址的方法可能基于深度,连接的客户端数或其他“智能”策略。 当前的实现仅仅是全部。 最终,目标是确保读取所有生产者的深度保持接近零。

重要的是要注意,nsqd和nsqlookupd守护程序旨在独立运行,之间无需通信或协调。

我们还认为,有一种查看,反思和管理汇总群集的方法,这一点非常重要。 我们构建了nsqadmin来做到这一点。 它提供了一个Web UI,以浏览主题/通道/消费者的层次结构,并检查每一层的深度和其他关键统计数据。 此外,它还支持一些管理命令,例如删除和清空通道(这是一个有用的工具当可以安全地丢弃通道中的消息以使深度回到0时)。

直接升级路径

这是我们的最高优先事项之一。 我们的生产系统全部基于我们现有的消息传递工具来处理大量流量,因此我们需要一种缓慢而有条不紊地升级基础架构的特定部分而几乎没有影响的方法。

首先,在消息生成器端,我们构建了nsqd来匹配simplequeue。 具体地说,nsqd像simplequeue一样,向POST二进制数据公开一个HTTP / put终结点(一个警告是该终结点采用一个附加的查询参数来指定“主题”)。 想要切换为开始发布到nsqd的服务仅需进行少量代码更改。

其次,我们在Python和Go中都建立了与我们现有库中习惯的功能和惯用语相匹配的库。 通过将代码更改限制为引导程序,这简化了消息使用方的过渡。 所有业务逻辑都保持不变。

最后,我们构建了将新旧组件粘合在一起的实用程序。 这些都可以在存储库的examples目录中找到:

  • 向NSQ集群中的主题公开类似于HTTP的pubsub的接口
  • 持久地将给定主题的所有消息写入文件
  • 将主题中的所有消息执行到(多个)端点的HTTP请求

消除SPOT

NSQ被设计为以分布式方式使用。 nsqd客户端(通过TCP)连接到提供指定主题的所有实例。 没有中间人,没有消息代理,也没有SPOF:

这种拓扑结构消除了链接单个聚合订阅源的需要。 相反,您直接从所有生产者那里消费。 从技术上讲,哪个客户端连接到哪个NSQ都无关紧要,只要有足够的客户端连接到所有生产者并满足消息量,就可以保证最终将全部处理。

对于nsqlookupd,可通过运行多个实例来实现高可用性。 它们不会直接相互通信,并且数据最终被认为是一致的。 使用者轮询所有已配置的nsqlookupd实例,并合并响应。 过时的节点,无法访问的节点或其他有故障的节点不会使系统瘫痪。

消息传递保证

NSQ保证消息至少要传递一次,尽管可能会重复消息。 消费者应该对此有所期待,并进行重复数据删除或执行幂等操作。

此保证作为协议的一部分而强制执行,并且按以下方式工作(假定客户端已成功连接并订阅了主题):

  • 客户端表明他们已准备好接收消息
  • NSQ发送消息并临时在本地存储数据(在重新排队或超时的情况下)
  • 客户端回复FIN或者REQ分别表示成功和失败,如果客户端在配置的时间间隔内不回复NSQ将超时,并将消息自动重新排队。)

这样可以确保导致消息丢失的唯一极端情况是nsqd进程的异常关闭。 在这种情况下,内存中的任何消息(或任何未刷新到磁盘的缓冲写入)都将丢失。

如果最重要的是防止消息丢失,那么即使是这种极端情况也可以缓解。 一种解决方案是启动冗余的nsqd副本实例(在单独的主机上),以接收消息的相同部分。 因为您已将使用者定为幂等,所以对这些消息进行两次处理不会对下游产生影响,并且使系统能够承受任何单节点故障而不会丢失消息。

结论是,NSQ提供了构建块来支持各种生产用例和可配置的持久性程度。

有限的内存占用

nsqd提供配置选项--mem-queue-size,该选项将确定给定队列在内存中保留的消息数。 如果队列的深度超过此阈值,则将消息透明地写入磁盘。 这会将给定nsqd进程的内存占用空间限制为mem-queue-size * #_of_channels_and_topics

而且,敏锐的观察者可能已经发现,通过将这个值设置为较低的值(如1或甚至0),这是获得更高交付保证的一种方便方法。

对于数据协议,我们做了一个关键的设计决策,通过将数据推送到客户端而不是等待它被拉出,从而最大化性能和吞吐量。这个概念,我们称之为RDY状态,本质上是客户端流控制的一种形式。

当客户端连接到nsqd并订阅频道时,它处于RDY状态0。这意味着不会有任何消息发送到客户端。 当客户端准备好接收消息时,它会发送一条命令,将其RDY状态更新为大约#,它准备处理,例如100。如果没有其他命令,则会在有可用消息时将100条消息推送到客户端(每次减少 该客户端的服务器端RDY计数)。

客户端库被设计用来发送一个命令来更新RDY计数,当它达到可配置的最大动态设置的25%时(适当地分配到多个nsqd实例的连接)。

这是一个重要的性能旋钮,因为某些下游系统能够更轻松地批量处理消息,并受益于更高的max-in-flight

值得注意的是,由于它既基于缓冲又具有推送功能,并且能够满足对流(通道)的独立副本的需求,因此我们生成了一个守护程序,其行为类似于simplequeue和pubsub组合。 在简化我们传统上会维护上面讨论过的旧工具链的系统拓扑方面,这很有用。

Go

我们很早就做出了一项战略决策,即在Go中构建NSQ核心。 我们最近在一点点博客中介绍了我们对Go的使用,并暗示了这个项目-浏览该帖子以了解我们对语言的看法可能会有所帮助。

关于NSQ,Go通道(不要与NSQ通道混淆)和该语言的内置并发功能非常适合nsqd的内部工作。 我们利用缓冲的通道来管理内存中的消息队列,并将溢出无缝地写入磁盘。

标准库使编写网络层和客户端代码变得容易。 内置的内存和cpu性能分析挂钩突出了优化的机会,并且只需很少的精力即可进行集成。 我们还发现隔离,测试使用接口的模拟类型以及迭代构建功能真的很容易。

简单部署

以下步骤将在本地计算机上运行一个小型NSQ群集,并逐步完成将消息发布,使用和归档到磁盘的过程。

  • 1.在一个shell中启动 nsqlookupd
$ nsqlookupd
复制代码
  • 2.在另一个shell中启动nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
复制代码
  • 3.在另一个shell中启动nsqadmin:
$ nsqadmin --lookupd-http-address=127.0.0.1:4161
复制代码
  • 4.发布一个初始化消息(在集群中创建主题):
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
复制代码
  • 5.最后,在另一个shell中,启动nsq_to_file:
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
复制代码
  • 6.发布更多的消息到nsqd上:
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
复制代码
  • 7.验证 为了验证事情按照我们期望的方式运行,在web浏览器中打开http://127.0.0.1:4171/来查询nsqadmin UI,并查询静态数据,同时,检查/tmp目录下日志文件test.*.log的内容。

使用Docker部署NSQ

这一小结详细介绍了如何在Docker容器中部署和运行nsq二进制文件

有一个包含所有NSQ二进制文件的最小nsq映像。 通过在运行Docker时将二进制文件指定为命令可以运行每个二进制文件。 基本格式为:

docker run nsqio/nsq /<command>
复制代码

请注意命令前的/号。例如:

docker run nsqio/nsq /nsq_to_file
复制代码

运行nsqlookupd

docker pull nsqio/nsq
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
复制代码

运行nsqd

首先,获取Docker主机的IP:

ifconfig | grep addr
复制代码

其次,运行nsqd容器:

docker pull nsqio/nsq
docker run --name nsqd -p 4150:4150 -p 4151:4151 \
    nsqio/nsq /nsqd \
    --broadcast-address=<host> \
    --lookupd-tcp-address=<host>:<port>
复制代码

--lookupd-tcp-address标志设置为以前运行nsqlookupd的主机的IP和TCP端口,即dockerIP:4160

特别注意:在本地部署测试的时候,这里不要用127.0.0.1

例如,给定主机IP 172.17.42.1

docker run --name nsqd -p 4150:4150 -p 4151:4151 \
    nsqio/nsq /nsqd \
    --broadcast-address=172.17.42.1 \
    --lookupd-tcp-address=172.17.42.1:4160
复制代码

请注意,该端口使用端口4160,这是我们启动nsqlookupd容器时公开的端口(它也是nsqlookupd的默认端口)。

如果要使用非默认端口,请更改-p参数:

docker run --name nsqlookupd -p 5160:4160 -p 5161:4161 nsqio/nsq /nsqlookupd
复制代码

这将使nsqlookupd在端口51605161的Docker主机IP上可用。

使用TLS

要对集装箱化的NSQ二进制文件使用TLS,您需要包括证书文件、私钥和根CA文件。Docker映像在/etc/ssl/certs/上有一个可用于此目的的卷安装。将包含文件的主机目录装载到卷中,然后像往常一样在命令行中指定文件:

docker run -p 4150:4150 -p 4151:4151 -p 4152:4152 -v /home/docker/certs:/etc/ssl/certs \
    nsqio/nsq /nsqd \
    --tls-root-ca-file=/etc/ssl/certs/certs.crt \
    --tls-cert=/etc/ssl/certs/cert.pem \
    --tls-key=/etc/ssl/certs/key.pem \
    --tls-required=true \
    --tls-client-auth-policy=require-verify
复制代码

这将把证书从/home/docker/certs加载到docker容器中,以便在运行时使用。

持久化NSQ数据

要将nsqd数据存储在主机磁盘上,请使用/data卷作为数据目录,这使您可以装载到仅包含数据的Docker容器或装载到主机目录:

docker run nsqio/nsq /nsqd \
    --data-path=/data
复制代码

使用docker-compose

为了使用docker-compose一起启动nsqdnsqlookupdnsqadmin,然后创建docker-compose.yml

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160"
      - "4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150"
      - "4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"
复制代码

要从与之前创建的docker-compose.yml相同的目录运行以下命令。

docker-compose up -d
复制代码

将创建一个专用网络,并使用该专用网络启动三个容器。 在本地主机上,每个容器将有一个随机端口映射到docker-compose.yml中公开的端口。 查看正在运行的容器状态和映射的端口。

docker-compose ps
复制代码

查看正在运行的容器中的日志。

docker-compose logs
复制代码

假设nsqlookupd将主机端口31001映射到容器端口4161,则可以使用curl执行简单的ping。

curl http://127.0.0.1:31001/ping
复制代码

Go实例

消费者

package main
 
import (
	"time"
	"fmt"
	"log"
	
	"github.com/nsqio/go-nsq"
)
 
func main() {
	err := initConsumer("test", "count", "127.0.0.1:4161")
	if err != nil {
		log.Fatal("init Consumer error")
	}
	err = initConsumer("test","count2","127.0.0.1:4161")
	if err != nil {
		log.Fatal("init Consumer error")
	}
	select {
 
	}
}
 
 
type nsqHandler struct {
	nsqConsumer      *nsq.Consumer
	messagesReceived int
}
 
//处理消息
func (nh *nsqHandler)HandleMessage(msg *nsq.Message) error{
	nh.messagesReceived++
	fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))
	fmt.Println()
	return nil
}
 
func initConsumer(topic, channel, addr string) error {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 3*time.Second
	c,err := nsq.NewConsumer(topic,channel,cfg)
	if err != nil {
		log.Println("init Consumer NewConsumer error:",err)
		return err
	}
 
	handler := &nsqHandler{nsqConsumer:c}
	c.AddHandler(handler)
 
	err = c.ConnectToNSQLookupd(addr)
	if err != nil {
		log.Println("init Consumer ConnectToNSQLookupd error:",err)
		return err
	}
	return nil
}


复制代码

生产者实例

package main
 
import (
	"fmt"
	"log"
	"bufio"
	"os"
 
	"github.com/nsqio/go-nsq"
)
 
 
func main() {
	strIP1 := "127.0.0.1:4150"
	strIP2 := "127.0.0.1:4152"
 
	producer1,err := initProducer(strIP1)
	if err != nil {
		log.Fatal("init producer1 error:",err)
	}
	producer2,err := initProducer(strIP2)
	if err != nil {
		log.Fatal("init producer2 error:",err)
	}
 
	defer producer1.Stop()
	defer producer2.Stop()
 
	//读取控制台输入
	reader := bufio.NewReader(os.Stdin)
 
	count := 0
	for {
		fmt.Print("please say:")
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			fmt.Println("stop producer!")
			return
		}
		if count % 2 == 0 {
			err := producer1.public("test1",command)
			if err != nil {
				log.Fatal("producer1 public error:",err)
			}
		}else {
			err := producer2.public("test2",command)
			if err != nil {
				log.Fatal("producer2 public error:",err)
			}
		}
 
		count++
	}
}
 
type nsqProducer struct {
	 *nsq.Producer
}
 
 
//初始化生产者
func initProducer(addr string) (*nsqProducer, error) {
	fmt.Println("init producer address:",addr)
	producer,err := nsq.NewProducer(addr,nsq.NewConfig())
	if err != nil {
		return nil,err
	}
	return &nsqProducer{producer},nil
}
 
//发布消息
func (np *nsqProducer)public(topic,message string) error {
	err := np.Publish(topic,[]byte(message))
	if err != nil {
		log.Println("nsq public error:",err)
		return err
	}
	return nil
}
复制代码

消费者消费信息的时候