找回密码
 立即注册
首页 业界区 业界 使用redis的stream数据类型做消息队列

使用redis的stream数据类型做消息队列

恃液 2025-6-25 21:45:06
在redis5.0之前,如果想使用它作为简单的消息队列,最好的选择就是自身提供的pub/sub模式.它支持简单的发布/订阅模式,发布一个channel绑定一条消息,然后可以有多个消费者监听这个channel,每个消费者都能收到相同的消息。不支持持久化,不支持查询,不支持分组,不支持分片消费,也没有提供很好的监控手段(有简单的pubsub容器命令,可以看有哪些channel,订阅者数量等)。但是5.0之后,倘若我们人仍选择redis作为简单消息队列,就可以使用新的数据类型STREAM
STREAM数据类型介绍

数据类型基础说明


  • 可以理解为一个有时间序列的一组数据集合,每一条新增的数据都是追加到数据集末尾,每一条数据都有自己的唯一id
  • 底层数据结构是基数树
  • 一个Stream可以有多个消费者分组group,每一个group也可以有多个消费者consumer,支持分片读取,全部读取,按照ID分段读取
  • 随机访问时间复杂度是O(1),向流中添加一个条目的时间为O(1)。 访问任意一项的时间为O(n),其中n是ID的长度.
常用命令及详解


  • XADD  向指定的 Stream 添加一条新消息。
    XADD key [MAXLEN [~] count] * field1 value1 [field2 value2 ...]
    参数说明:
    key:Stream 的名称。
    MAXLEN [~] count:可选,限制 Stream 最大长度,超出自动裁剪最老消息。~ 表示近似修剪,性能更优。【实际上使用要注意,超过最大值直接丢弃,也就是“消失了“】
    *:让 Redis 自动生成消息ID,也可自定义ID。
    field value:消息体的键值对。
    用法举例:XADD mystream * name Alice age 20
  • XRANGE 按ID范围读取 Stream 中的消息
    XRANGE key start end [COUNT count]
    参数说明:
    start、end:起止ID,- 表示最小ID,+ 表示最大ID。
    COUNT:可选,限制返回条数。
    用法举例:XRANGE mystream - +         # 读取所有消息
  • XREAD 从一个或多个 Stream 读取新消息,可阻塞等待
    XREAD [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    参数说明:
    BLOCK:可选,阻塞等待新消息的毫秒数。
    STREAMS:后面跟 Stream 名称和起始ID。
    用法举例:XREAD BLOCK 5000 STREAMS mystream $ $ 表示只读新消息
  • XGROUP  创建、删除、管理 Stream 的消费者组。
    XGROUP CREATE mystream mygroup 0-0 MKSTREAM
    常用子命令:

    • 创建组:XGROUP CREATE mystream mygroup 0-0 MKSTREAM
      0-0:从头消费;$:只消费新消息。
      MKSTREAM:Stream 不存在时自动创建
    • 删除组:XGROUP DESTROY mystream mygroup
    • 创建消费者、删除消费者。一般不需要,会自动创建
      XGROUP CREATECONSUMER mystream mygroup consumer-1
      XGROUP CREATECONSUMER mystream mygroup consumer-1

  • XREADGROUP 以消费者组身份读取消息,实现分布式并发消费
    XREADGROUP GROUP group consumer [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    参数说明:
    GROUP group consumer:指定组名和消费者名。
    id:> 表示只读未分配的新消息,其他ID(如0)可用于补偿pending。
    举例:XREADGROUP GROUP mygroup consumer-1 BLOCK 5000 STREAMS mystream >
  • XPENDING 查看某个组下所有未ack的消息(即已分配但未确认)注意这里不是消息的快照,它只是存储消息的ID列表,并不会复制一份消息内容
    XPENDING key group [start end count [consumer]]
    举例:XPENDING mystream mygroup - + 10
    XPENDING mystream mygroup - + 10 consumer-1
  • XACK 用于确认消息已被消费,也就是从pending状态PEL中移除
    举例:XACK mystream mygroup 1680000000000-0
  • XCLAIM/XAUTOCLAIM 将长时间未ack的pending消息转移到其他消费者/实现自动补偿。
    举例:XCLAIM mystream mygroup consumer-2 60000 1680000000000-0
    XAUTOCLAIM mystream mygroup consumer-2 60000 0-0 COUNT 10
  • XTRIM 限制流的最大长度,自动删除最老的消息。无论是否被ack的消息,都会被裁减。
    语法:XTRIM key MAXLEN [~] count
    举例:XTRIM mystream MAXLEN ~ 1000
  • XDEL 从Stream中删除指定ID的消息,可以一次删除多个,用空格隔开即可
    XDEL mystream 1680000000000-0
实际使用场景

可用作消息队列


  • 当需要一个轻量级的、安全性要求比较低、可靠性不要求那么高的一个消息队列时,使用stream就很合适,性能也非常不错,单机能支持每秒几十万的写入
  • 典型场景:订单异步处理、短信/邮件通知、日志收集、任务分发等
可以作为事件总线


  • 作为事件总线,支撑微服务间的事件发布与订阅,作为事件源(例如,跟踪用户操作、点击等)。
  • 例如:用户注册事件、支付完成事件等,多个服务可并发消费
延迟队列/死信队列


  • 利用 Stream 的 pending/ack/xclaim 机制实现可靠的延迟消息、死信消息补偿。
实时数据流处理


  • IoT、监控、风控等场景下,设备/传感器数据实时写入 Stream,后端实时消费分析。
  • 支持高并发写入和多消费者并发处理
重要说明

关于持久化和消息删除


  • 消息是默认就持久化的,并且并不提供设置过期时间,那么如果在消息量大且请求量大的情况下,会占用很多内存
  • 如果在新增消息的时候使用maxlen选项限定了stream的长度,那么一定要考虑使用多个consumer,而且要提供一定的处理机制在某些consumer不可用的时候,将消息XCLAIM到可用的消费者。避免超过限定长度后,丢失消息。
  • 不推荐每次消费完成后使用Xdel去删除,而是采用Xtrim收缩,结合Xinfo、Xlen等命令定期检测stream的长度,然后根据实际情况设置合理的收缩长度,定期的清理不再使用的消息。因为即使使用Xdel取删除消息,在当前的实现中,直到宏节点完全为空时才真正回收内存
读取的阻塞和非阻塞


  • XRANGE 、XREAD 或 XREADGROUP ,没有BLOCK选项时,像任何其他Redis命令一样同步调用,此时他们就是同步命令;如果加上BLOCK选项就时非阻塞的,等待指定的毫秒直到有可以消费的消息并立即返回
插入的性能


  • XADD 非常快,如果使用流水线,在普通机器中每秒可以轻松插入50万到100万项
  • 以下是官网提供的延迟测试结果:【在这里,我们每次迭代最多处理10k条消息,这意味着 XREADGROUP 的 COUNT 参数被设置为10000。这增加了大量的延迟,但为了让缓慢的消费者能够跟上消息流,这是必需的。因此,你可以预期真实世界的延迟要小得多】
[code]                Results obtained: 结果:                Processed between 0 and 1 ms -> 74.11%                Processed between 1 and 2 ms -> 25.80%                Processed between 2 and 3 ms -> 0.06%                Processed between 3 and 4 ms -> 0.01%                Processed between 4 and 5 ms -> 0.02%                因此,99.9%的请求的延迟 ID,表示消费者只想接收从未发送给其他消费者的信息。它的意思是,给我新邮件。任何其他 ID,即 0 或任何其他有效 ID 或不完整 ID(仅毫秒时间部分),都将导致返回发送命令的用户的待处理条目,且 ID 大于所提供的 ID。因此,基本上如果 ID 不大于,那么命令将只允许客户访问其待处理条目:已向其发送但尚未确认的信息。请注意,在这种情况下,BLOCK 和 NOACK 都会被忽略。
</ol></ul>属于PEL中的消息可以删除吗

pending状态的消息是可以被删除的,redis并没有设计未确认的消息不允许删除。如果采用xdel删除消息后,pending列表将仍然保留待消费消息的ID,但是消息内容没有了。因此,在读取此类PEL条目时,Redis会返回一个空值。
一个stream的一个group多个consumer时如何消费的

1. 分区/竞争消费(Work Queue 模式)


  • 每条消息只会被 group 下的一个消费者消费,不会被所有消费者都消费。
  • Redis 会将新消息分配给 group 内“空闲”的消费者,实现消息的负载均衡(轮询或空闲优先,具体是由实现的客户端决定)。
  • 多个消费者并发时,消息会被“分摊”到各个消费者,每个消息只会被其中一个消费。
  • 消息被转XCLAIM到另一个消费者时会增加投递次数,并发时投递次数、时间戳都会变化,因此也只有一个消费者成功获取。XPENDING命令就可以看到每个消息被投递的次数
2. pending 机制


  • 消费者用 XREADGROUP 拉取消息后,消息会进入该消费者的 pending(未确认)列表,直到被 XACK。
  • 如果某个消费者挂掉,pending 里的消息可以被其他消费者用 XCLAIM/XAUTOCLAIM 方式“抢救”回来,保证消息最终被消费。
3. 分布式环境下的存储


  • stream的增加数据和其他数据类型一样,都是需要一个唯一的key,然后给key绑定指定数据类型的一个或者多个值
  • 那也就是说,即使在分布式存储环境下,它和其他的key一样,相同的key的数据一定存在同一个分片上(因为redis的分片机制就是按照Key来实现的)
  • 实际使用时key的设置就要相对分散,否则数据会倾斜到某些节点上
x. 如果要“广播”效果(每个消费者都收到同一条消息),需要每个消费者用不同的 group。或者都广播了,就使用PUB/SUB吧,,~~

观测流


  • Redis流和消费者组有不同的方式来观察正在发生的事情,比如前面说的XPENDING ,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和交付数量
  • XINFO:这个命令使用子命令来显示流及其消费者组状态的不同信息。例如,XINFO流报告有关流本身的信息。可以用于Stream、Group、CONSUMERS
  • 实际项目中可结合其他命令,直观的展示流的各种信息,比如有多少个分组、有哪些分组、有哪些消费者、消费者状态、消费进度、总条目数据等。有了这些信息就可以对消息的可靠性进行分析,还能及时发现资源占用情况,结合定时任务等作出具体性能调整。
更加详细stream的细节介绍,可以参考官网:https://redis.io/docs/latest/develop/data-types/streams
稍后我将具体介绍如何在代码中使用stream来作为消息队列。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册