0%

redis-stream消息队列详解

[toc]


redis5.0增加了一个结构 stream
是具备一些可靠性机制的消息队列实现

Q: stream中具有哪些结构?#

A:

  1. 消费组——Consumergroup
    1个消费组可以有多个消费者,消费者在一个消费组里抢消息。
  2. 游标id——last_deliverer_id
    记录当前这个消费组被读取到哪了。
    每有一个消费者读取了一条消息, 组的这个游标就+1
  3. 未接收响应的消息里pending_ids
    记录了已经被消费,但是还没调用XACK进行响应的消息。
    是相比list消息队列的重要可靠性提升成员。
  4. 消息id——每个消息独一无的身份
    时间戳+序号
  5. 消息内容
    8efce808fdd905c339f52f6afa33abba45d7a400

stream简单命令#

详细:
Redis Stream

  • XADD 添加消息
  • XDEL 直接删除
  • XREAD 消费,支持阻塞,且消费后会进入pending
  • XACK
  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息

Q: stream生成的消息id是依赖时间戳的,如果机器因为时间发现不同步,决定回退时钟,可能导致出现一样的消息id,怎么办?#

A:
会维护一个latest_generated_id,即上一次生成的消息id。
当她发现这次的时间戳, 比之前的消息id的时间要小, 说明发生了时间回退
这时候他仍会沿用上一次消息id的时间戳,只是在选后上去+1。
db5affdc81c9d08b0dc6b7fb9e6948e5afdaf5a3


Q: 某个消费者消费了消息, 但是消费的过程中挂掉了 ,在重启中,这时候消息会丢失吗?#

A:
stream有个pending列表,存放已经被消费走,但是没收到ACK响应的消息。
pending列表中会记录这个消息被谁消费了。
如果重启的消费者上限后,来取消息时,会先从这个pending里取消息, 这样就可以保证拿到消息不丢了。


Q: 如果消费者不是重启,而是整个机器被剔除了呢?或者长时间挂掉#

A:
XPENDING 命令可以查看pending消息的未被读取时长。
如果超过一定时间, 可以用XCLAIM命令进行转移, 把超过时间的消息转移给另一个消费者的pending列表处理。
这个需要自己写定时线程检查。


Q: 如果这个消息本身就有问题, 无论谁消费都会报错,导致没有ack,怎么办?#

A:
pending消息还有个属性是被读取次数。
如果发现被读取次数超过一定上限, 说明有问题,于是调用XDEL删除问题消息。