[toc]
redis5.0增加了一个结构 stream
是具备一些可靠性机制的消息队列实现
Q: stream中具有哪些结构?#
A:
- 消费组——Consumergroup
1个消费组可以有多个消费者,消费者在一个消费组里抢消息。 - 游标id——last_deliverer_id
记录当前这个消费组被读取到哪了。
每有一个消费者读取了一条消息, 组的这个游标就+1 - 未接收响应的消息里pending_ids
记录了已经被消费,但是还没调用XACK进行响应的消息。
是相比list消息队列的重要可靠性提升成员。 - 消息id——每个消息独一无的身份
时间戳+序号 - 消息内容
stream简单命令#
详细:
Redis Stream
- XADD 添加消息
- XDEL 直接删除
- XREAD 消费,支持阻塞,且消费后会进入pending
- XACK
- XGROUP CREATE - 创建消费者组
- XREADGROUP GROUP - 读取消费者组中的消息
Q: stream生成的消息id是依赖时间戳的,如果机器因为时间发现不同步,决定回退时钟,可能导致出现一样的消息id,怎么办?#
A:
会维护一个latest_generated_id,即上一次生成的消息id。
当她发现这次的时间戳, 比之前的消息id的时间要小, 说明发生了时间回退
这时候他仍会沿用上一次消息id的时间戳,只是在选后上去+1。
Q: 某个消费者消费了消息, 但是消费的过程中挂掉了 ,在重启中,这时候消息会丢失吗?#
A:
stream有个pending列表,存放已经被消费走,但是没收到ACK响应的消息。
pending列表中会记录这个消息被谁消费了。
如果重启的消费者上限后,来取消息时,会先从这个pending里取消息, 这样就可以保证拿到消息不丢了。
Q: 如果消费者不是重启,而是整个机器被剔除了呢?或者长时间挂掉#
A:
XPENDING 命令可以查看pending消息的未被读取时长。
如果超过一定时间, 可以用XCLAIM命令进行转移, 把超过时间的消息转移给另一个消费者的pending列表处理。
这个需要自己写定时线程检查。
Q: 如果这个消息本身就有问题, 无论谁消费都会报错,导致没有ack,怎么办?#
A:
pending消息还有个属性是被读取次数。
如果发现被读取次数超过一定上限, 说明有问题,于是调用XDEL删除问题消息。