要实现生产者确认机制失败后自动重试重新投递,核心思路是:将发送失败的消息暂存→按策略重试→跟踪重试状态→失败兜底。以下是具体实现思路和关键步骤,结合代码示例说明。
一、核心思路框架
当生产者通过 ConfirmCallback 收到 ack=false(Broker 未确认接收)或超时未收到确认时,说明消息发送失败。此时需将消息暂存到可靠存储(避免内存丢失),并按重试策略(次数、间隔)重新投递,直至成功或超过阈值后转入死信队列。
二、关键实现步骤
1. 定义“失败消息”存储结构
需持久化存储失败消息的核心信息,确保重启后不丢失。推荐用 数据库(MySQL/PostgreSQL)或 Redis(缓存+持久化),字段包括:
字段名说明示例值msg_id消息唯一ID(CorrelationData的ID)MSG-1690000000000-0.123456exchange目标交换机名称order.exchangerouting_key目标路由键order.routingKeymessage_body消息体(JSON序列化){"id":1001,"amount":99.9}retry_count当前重试次数0(初始值)max_retry最大重试次数(如3次)3next_retry_time下次重试时间(时间戳)1690000060000(当前时间+10秒)status状态(待重试/重试中/失败)PENDING2. 实现“失败消息”存储层
以 MySQL 为例,创建表存储失败消息:- CREATE TABLE mq_failed_message (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- msg_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一ID',
- exchange VARCHAR(128) NOT NULL COMMENT '交换机',
- routing_key VARCHAR(128) NOT NULL COMMENT '路由键',
- message_body TEXT NOT NULL COMMENT '消息体(JSON)',
- retry_count INT DEFAULT 0 COMMENT '当前重试次数',
- max_retry INT DEFAULT 3 COMMENT '最大重试次数',
- next_retry_time BIGINT NOT NULL COMMENT '下次重试时间戳(ms)',
- status VARCHAR(20) DEFAULT 'PENDING' COMMENT '状态:PENDING/RETRYING/FAILED',
- created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- INDEX idx_next_retry_time (next_retry_time) COMMENT '按下次重试时间索引'
- );
复制代码 3. 发送消息时关联“失败存储”
生产者发送消息时,生成唯一 msg_id(如 UUID 或雪花算法),并在 ConfirmCallback 中处理失败逻辑:- @Service
- public class RetryProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private FailedMessageStorage failedMessageStorage; // 失败消息存储接口
- public void sendWithRetry(Object message, String exchange, String routingKey) {
- // 1. 生成唯一消息ID
- String msgId = "MSG-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);
- CorrelationData correlationData = new CorrelationData(msgId);
- // 2. 发送消息(携带CorrelationData)
- rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
- // 3. 注册确认回调(处理成功/失败)
- rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {
- String id = correlationData1 != null ? correlationData1.getId() : null;
- if (id == null) return;
- if (ack) {
- // 确认成功:删除存储中的失败记录(若有)
- failedMessageStorage.deleteById(id);
- System.out.println("消息确认成功,ID: " + id);
- } else {
- // 确认失败:存入失败消息表,等待重试
- FailedMessage failedMsg = new FailedMessage();
- failedMsg.setMsgId(id);
- failedMsg.setExchange(exchange);
- failedMsg.setRoutingKey(routingKey);
- failedMsg.setMessageBody(JSON.toJSONString(message)); // 序列化为JSON
- failedMsg.setRetryCount(0);
- failedMsg.setMaxRetry(3);
- failedMsg.setNextRetryTime(System.currentTimeMillis() + 10 * 1000); // 10秒后重试
- failedMsg.setStatus("PENDING");
- failedMessageStorage.save(failedMsg);
- System.err.println("消息确认失败,存入重试队列,ID: " + id + ",原因: " + cause);
- }
- });
- }
- }
复制代码 4. 实现重试调度器(核心)
通过 定时任务(如 Spring Scheduler)或 线程池 定期检查失败消息表,对 next_retry_time ≤ 当前时间 的消息执行重试:- @Component
- @EnableScheduling
- public class RetryScheduler {
- @Autowired
- private FailedMessageStorage failedMessageStorage;
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private DeadLetterQueueHandler deadLetterQueueHandler; // 死信队列处理器
- // 每5秒扫描一次待重试消息
- @Scheduled(fixedRate = 5000)
- public void retryFailedMessages() {
- List<FailedMessage> pendingMsgs = failedMessageStorage.queryPendingMessages(System.currentTimeMillis());
- for (FailedMessage msg : pendingMsgs) {
- try {
- // 1. 标记为“重试中”(避免并发重复处理)
- failedMessageStorage.updateStatus(msg.getMsgId(), "RETRYING");
- // 2. 反序列化消息体
- Object message = JSON.parseObject(msg.getMessageBody(), Object.class); // 根据实际类型强转
- // 3. 重新发送消息(携带原msgId)
- CorrelationData correlationData = new CorrelationData(msg.getMsgId());
- rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRoutingKey(), message, correlationData);
- // 4. 更新重试次数和下次重试时间(指数退避:10s→20s→40s)
- int newRetryCount = msg.getRetryCount() + 1;
- long nextRetryInterval = (long) (10 * Math.pow(2, newRetryCount - 1)) * 1000; // 指数退避
- long nextRetryTime = System.currentTimeMillis() + nextRetryInterval;
- if (newRetryCount >= msg.getMaxRetry()) {
- // 超过最大重试次数:转入死信队列
- deadLetterQueueHandler.sendToDlx(msg);
- failedMessageStorage.updateStatus(msg.getMsgId(), "FAILED");
- System.err.println("消息重试次数耗尽,转入死信队列,ID: " + msg.getMsgId());
- } else {
- // 更新重试状态(次数+1,下次重试时间)
- failedMessageStorage.updateRetryInfo(msg.getMsgId(), newRetryCount, nextRetryTime, "PENDING");
- System.out.println("消息重试中,ID: " + msg.getMsgId() + ",第" + newRetryCount + "次");
- }
- } catch (Exception e) {
- // 重试过程中异常:恢复状态为PENDING,等待下次扫描
- failedMessageStorage.updateStatus(msg.getMsgId(), "PENDING");
- System.err.println("重试发送失败,ID: " + msg.getMsgId() + ",错误: " + e.getMessage());
- }
- }
- }
- }
复制代码 5. 死信队列兜底(重试失败的最终处理)
当消息超过最大重试次数仍未成功,将其转入死信队列(DLX),人工介入处理:- @Component
- public class DeadLetterQueueHandler {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendToDlx(FailedMessage msg) {
- // 发送到死信交换机(需提前配置死信队列和交换机)
- rabbitTemplate.convertAndSend(
- "dlx.exchange",
- "dlx.routingKey",
- msg.getMessageBody(),
- message -> {
- MessageProperties props = message.getMessageProperties();
- props.setHeader("failed_reason", "max_retry_exceeded");
- props.setHeader("original_msg_id", msg.getMsgId());
- return message;
- }
- );
- }
- }
复制代码 三、关键技术点说明
1. 重试策略
- 指数退避:重试间隔随次数递增(如 10s→20s→40s),避免集中重试压垮 Broker;
- 固定间隔:简单场景用固定间隔(如每 30 秒重试一次);
- 随机间隔:避免多个生产者同时重试导致“惊群效应”。
2. 幂等性保障
重试可能导致消息重复投递,消费者需通过 唯一ID去重(如 Redis 记录已处理 msg_id,有效期 24 小时):- @Component
- public class IdempotentConsumer {
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
- @RabbitListener(queues = "order.queue")
- public void consume(String message, @Header("msg_id") String msgId) {
- String key = "processed_msg:" + msgId;
- if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
- // 已处理,直接确认
- channel.basicAck(deliveryTag, false);
- return;
- }
- // 业务逻辑处理...
- redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS); // 记录已处理
- channel.basicAck(deliveryTag, false);
- }
- }
复制代码 3. 存储选型对比
存储方式优点缺点适用场景MySQL持久化可靠,支持复杂查询性能较低,需维护数据库连接生产环境(消息重要性高)Redis高性能,支持过期时间数据易失(需开启RDB/AOF持久化)中小规模、对性能要求高的场景内存队列速度快重启丢失消息测试环境或临时重试四、总结
生产者确认失败后的重试重新投递,本质是“存储-调度-重试-兜底”的闭环:
- 存储:用数据库/Redis 持久化失败消息;
- 调度:定时任务扫描待重试消息;
- 重试:按指数退避策略重新发送,更新重试状态;
- 兜底:超过次数后转入死信队列,人工介入。
通过这套机制,可确保消息在网络抖动、Broker 临时不可用等场景下仍能最终投递成功,同时避免无限重试的资源浪费。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |