redis5新增特性:Streams 篇二 (消费者)

455 阅读6分钟

redis5新增特性:Streams 篇一 (增删改查) juejin.cn/post/684490…

独立消费

# xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
127.0.0.1:6379> xread count 2 streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1579241872658-0"
         2) 1) "name"
            2) "zfh"
            3) "age"
            4) "25"
      2) 1) "1579241876116-0"
         2) 1) "name"
            2) "tom"
            3) "age"
            4) "10"
127.0.0.1:6379> xread count 1 streams mystream $
(nil)

以上是XREAD的非阻塞形式,我们可以通过指定BLOCK参数,轻松地将XREAD 变成一个阻塞命令

127.0.0.1:6379> xread block 0 streams mystream $

在上面的例子中,除了移除COUNT以外,我指定了新的BLOCK选项,超时时间为0毫秒(意味着永不超时)。此外,我并没有给流 mystream传入一个常规的ID,而是传入了一个特殊的ID $。这个特殊的ID意思是XREAD应该使用流mystream已经存储的最大ID作为最后一个ID。以便我们仅接收从我们开始监听时间以后的新消息。这在某种程度上相似于Unix命令tail -f

开启另一个窗口向mystream中添加新的项目

# 新窗口
127.0.0.1:6379> xadd mystream * from nanjing
"1579243572608-0"

# 原窗口
xread block 0 streams mystream $
1) 1) "mystream"
   2) 1) 1) "1579243572608-0"
         2) 1) "from"
            2) "nanjing"
(308.84s)

原窗口的阻塞状态解除,耗时308.84s

XREAD的阻塞形式同样可以监听多个Stream,只需要指定多个键名即可;可以使用任意有效的ID;可以设置阻塞时间

消费组

添加消费组

这里添加了两个组mygroup_tailmygroup_head。分别从尾部、头部消费

# xgroup [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
127.0.0.1:6379> xgroup create mystream mygroup_tail $ # 从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
OK
127.0.0.1:6379> xgroup create mystream mygroup_head 0-0 #从头开始消费
OK
  • 如果我们指定的消息ID是0,那么消费者组将会开始消费这个Stream中的所有历史消息。

  • 可以指定任意其他有效的ID,消费者组将开始传递ID大于你所指定的ID的消息。

  • 因为$表示Stream中当前最大ID的意思,指定$会有只消费新消息的效果。

获取有关Stream或消费者组的信息

XINFO命令是一个可观察性接口,可以与子命令一起使用,以获取有关Stream或消费者组的信息。

这个命令使用子命令来显示有关Stream和消费者组的状态的不同信息,比如使用XINFO STREAM可以报告关于Stream本身的信息。

127.0.0.1:6379> xinfo help
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname>  -- Show consumer groups of group <groupname>.
3) GROUPS <key>                 -- Show the stream consumer groups.
4) STREAM <key>                 -- Show information about the stream.
5) HELP                         -- Print this help.
127.0.0.1:6379> xinfo stream mystream
 1) "length"
 2) (integer) 3 # 长度3
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 2 # 两个消费组
 9) "last-generated-id"
10) "1579250824174-0"
11) "first-entry" # 头部消息
12) 1) "1579250815501-0"
    2) 1) "name"
       2) "zfh"
       3) "age"
       4) "25"
13) "last-entry" # 尾部消息
14) 1) "1579250824174-0"
    2) 1) "name"
       2) "jerry"
       3) "age"
       4) "9"
       
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup_head"
   3) "consumers"
   4) (integer) 0 # 暂无消费者
   5) "pending"
   6) (integer) 0 # 暂无处理中的消息
   7) "last-delivered-id"
   8) "0-0"
2) 1) "name"
   2) "mygroup_tail"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1579250824174-0"

创建消费者

XREADGROUP的响应内容就像XREAD一样。但是请注意上面提供的GROUP <group-name> <consumer-name>,这表示我想要使用消费者组mygroup从Stream中读取。

消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。

#  xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
127.0.0.1:6379> xreadgroup group mygroup_tail tail_1 count 1 streams mystream >
(nil)
127.0.0.1:6379> xreadgroup group mygroup_head head_1 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1579250815501-0"
         2) 1) "name"
            2) "zfh"
            3) "age"
            4) "25"
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup_head"
   3) "consumers"
   4) (integer) 1 # 一个消费者
   5) "pending"
   6) (integer) 1 # 一个正在处理的消息
   7) "last-delivered-id"
   8) "1579250815501-0"
2) 1) "name"
   2) "mygroup_tail"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1579250824174-0"

确认消费

# xack key group ID [ID ...]
127.0.0.1:6379> xack mystream mygroup_head 1579250815501-0
(integer) 1
127.0.0.1:6379> xreadgroup group mygroup_head head_1 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1579250819612-0"
         2) 1) "name"
            2) "tom"
            3) "age"
            4) "10"

关于xackID的注意事项:

1,当ID等于>时,返回到目前为止从未传递给其他消费者的新消息,这有一个副作用,就是会更新消费者组的最后ID

2, 如果ID是任意其他有效的数字ID时,将永远不会在组中看到新消息,命令将会让我们访问我们的历史待处理消息。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用XACK进行确认。

127.0.0.1:6379> xadd mystream * n1 n11
"1579252541369-0"
127.0.0.1:6379> xadd mystream * n2 n22
"1579252555690-0"
127.0.0.1:6379> xreadgroup group mygroup_tail tail_3 count 1 streams mystream 0
1) 1) "mystream"
   2) (empty list or set)
127.0.0.1:6379> xreadgroup group mygroup_tail tail_3 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1579252541369-0"
         2) 1) "n1"
            2) "n11"
127.0.0.1:6379> xreadgroup group mygroup_tail tail_3 count 1 streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1579252541369-0"
         2) 1) "n1"
            2) "n11"
127.0.0.1:6379> xack mystream mygroup_tail 1579252541369-0
(integer) 1

# n1将不再是历史待处理消息的一部分,因此系统将不再报告任何消息:
127.0.0.1:6379> xreadgroup group mygroup_tail tail_3 count 1 streams mystream 0
1) 1) "mystream"
   2) (empty list or set)

xreadgroupxread一样,拥有一个block参数

# 原窗口
127.0.0.1:6379> xreadgroup group mygroup_tail tail_3 block 0 count 1 streams mystream >

# 新建窗口
127.0.0.1:6379> xadd mystream * n8 n88
"1579255976296-0"

# 原窗口
127.0.0.1:6379> xreadgroup group mygroup_tail tail_3 block 0 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1579255976296-0"
         2) 1) "n8"
            2) "n88"
(17.35s)

127.0.0.1:6379> xinfo consumers mystream mygroup_tail
1) 1) "name"
   2) "tail_3"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 394047

同一消费者组中的不同消费者,可以获取到不同的消息:

# 首先向mystream加入t1,t2,t3,t4四条消息
127.0.0.1:6379> xreadgroup group mygroup_tail test_1 count 2 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1579258697102-0"
         2) 1) "t1"
            2) "t1"
      2) 1) "1579258700649-0"
         2) 1) "t2"
            2) "t2"
127.0.0.1:6379> xreadgroup group mygroup_tail test_2 count 2 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1579258703269-0"
         2) 1) "t3"
            2) "t3"
      2) 1) "1579258707717-0"
         2) 1) "t4"
            2) "t4"

假如我们想象有三个消费者C1,C2,C3,以及一个包含了消息1, 2, 3, 4, 5, 6, 7的Stream,我们想要按如下图表的方式处理消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

显然,xreadgroup可以轻松实现这个需求。XREAD已经提供了一种方式可以扇形分发到N个客户端,还可以使用从节点来提供更多的读取可伸缩性。然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是从同一流向许多客户端提供不同的消息子集。这很有用的一个明显的例子是处理消息的速度很慢:能够让N个不同的客户端接收流的不同部分,通过将不同的消息路由到准备做更多工作的不同客户端来扩展消息处理工作。

tip:

  • 消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。
  • 即使使用XREADGROUP,你也可以同时从多个key中读取,但是要让其工作,你需要给每一个Stream创建一个名称相同的消费者组。
  • XREADGROUP命令是一个写命令,因为当它从Stream中读取消息时,消费者组被修改了,所以这个命令只能在master节点调用。

参考链接:www.redis.cn/topics/stre…