阅读 93

redis5新增特性:Streams 篇一 (增删改查)

StreamRedis 5.0引入的一种新数据类型,允许消费者等待生产者发送的新数据,

Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。

每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。

每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。

每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到。

同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者者有一个组内唯一名称。

消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL,也就是Pending Entries List,它用来确保客户端至少消费了消息一次

增删改查

1,xadd 追加消息

2,xdel 删除消息

3,xrange 获取消息列表,会自动过滤已经删除的消息

4,xlen 消息长度

5,del 删除Stream

# xadd key ID field string [field string ...]   *号表示服务器自动生成ID
127.0.0.1:6379> xadd mystream * name zfh age 25
"1578845024424-0"
127.0.0.1:6379> xadd mystream * name tom age 10
"1578845052908-0"
127.0.0.1:6379> xadd mystream * name jerry age 9
"1578845085696-0"
127.0.0.1:6379> 
复制代码
# xlen key 消息长度
127.0.0.1:6379> xlen mystream
(integer) 3

# 消息列表 -表示最小值, +表示最大值
# xrange key start end [COUNT count]
127.0.0.1:6379> xrange mystream - +
1) 1) "1578845024424-0"
   2) 1) "name"
      2) "zfh"
      3) "age"
      4) "25"
2) 1) "1578845052908-0"
   2) 1) "name"
      2) "tom"
      3) "age"
      4) "10"
3) 1) "1578845085696-0"
   2) 1) "name"
      2) "jerry"
      3) "age"
      4) "9"
127.0.0.1:6379> xrange mystream 1578845052908-0 +  # 指定最小消息ID的列表
1) 1) "1578845052908-0"
   2) 1) "name"
      2) "tom"
      3) "age"
      4) "10"
2) 1) "1578845085696-0"
   2) 1) "name"
      2) "jerry"
      3) "age"
      4) "9"
127.0.0.1:6379> xrange mystream - 1578845052908-0 # 指定最大消息ID的列表
1) 1) "1578845024424-0"
   2) 1) "name"
      2) "zfh"
      3) "age"
      4) "25"
2) 1) "1578845052908-0"
   2) 1) "name"
      2) "tom"
      3) "age"
      4) "10"
复制代码
# xdel key ID [ID ...]
127.0.0.1:6379> xdel mystream 1578845052908-0
(integer) 1
127.0.0.1:6379> xlen mystream
(integer) 2
127.0.0.1:6379> xrange mystream - +
1) 1) "1578845024424-0"
   2) 1) "name"
      2) "zfh"
      3) "age"
      4) "25"
2) 1) "1578845085696-0"
   2) 1) "name"
      2) "jerry"
      3) "age"
      4) "9"
      
127.0.0.1:6379> del mystream # 删除整个Stream
(integer) 1
127.0.0.1:6379> xrange mystream - +
(empty list or set)
复制代码

消息id

消息ID的形式是timestampInMillis-sequence,例如1578845052908-0,它表示当前的消息在毫米时间戳1578845052908时产生,并且是该毫秒内产生的第0条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。

未完待续~下一篇学习消息的用法

redis5新增特性:Streams 篇二 (消费者) juejin.im/post/5e214f…

关注下面的标签,发现更多相似文章
评论