找回密码
 立即注册
首页 业界区 业界 RabbitMQ投递回调机制以及策略业务补偿

RabbitMQ投递回调机制以及策略业务补偿

少琼 昨天 16:35
————以点赞消息案例为例
一、关于RabbitMQ回调机制知识点补充: https://www.cnblogs.com/Mr-Keep/p/19140274
在 RabbitMQ 中,生产者发送消息后,有可能遇到以下几种情况:

  • 消息成功投递到交换机(Exchange)
  • 消息未能成功投递到交换机(Exchange)
  • 消息成功进入交换机但无法路由到队列(Queue)
如果生产者端没有回调确认机制,就可能出现严重的数据不一致:
举例: Redis 已经增加点赞数,但消息并未真正进入 MQ,数据库后续也无法更新,就出现了 “缓存超前、数据库缺失” 的问题。
为了解决这种问题,Spring AMQP 提供了:

  • RabbitTemplate.setConfirmCallback()
  • RabbitTemplate.setReturnsCallback()
来捕获和处理消息投递的成功与失败。
但是在复杂系统中,不同的业务消息(例如“下单”、“扣库存”、“发积分”)在投递失败时,需要采取不同的补偿逻辑
弊端:如果你只写一份大而全的回调逻辑,代码就会充满大量的 if else 判断,非常难维护。
二、策略模式思想引入
策略模式的核心思想是:定义一系列算法(或行为),让它们可以相互替换,且算法的变化不会影响使用算法的客户。


  • “算法” ≈ “不同的消息回调处理逻辑”
  • “客户” ≈ “RabbitTemplate 的 ConfirmCallback 回调”
操作:通过(根据业务抽象)接口 + Map 注入,在运行时动态选择。
代码实现
1、定义统一的回调处理接口
  1. public interface ConfirmCallbackService {
  2.     /**
  3.      * 投递失败后的回调处理
  4.      * @param message 投递的消息对象
  5.      */
  6.     void confirmCallback(Message message);
  7. }
复制代码
例:定义点赞案例的实现类(可选):
  1. public class LikeConfirmCallback implements ConfirmCallbackService{
  2.     /**
  3.      * 注入RedisTemplate
  4.      */
  5.     private final RedisTemplate<String,Integer> redisTemplate;
  6.     /**
  7.      * 执行失败后的反向操作
  8.      * @param message 投递的消息对象
  9.      */
  10.     @Override
  11.     public void confirmCallback(Message message) {
  12.         byte[] bytes = message.getBody();
  13.         //反向序列化为LikeDTO对象
  14.         try {
  15.             LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);
  16.             if(dto.getLikeStatus()){
  17.                 redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(), dto.getUid());
  18.             }else{
  19.                 redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(),dto.getUid());
  20.             }
  21.         } catch (IOException e) {
  22.             throw new RuntimeException(e);
  23.         }
  24.     }
  25. }
复制代码
小技巧:

  • 可选不单独定义类,而是让业务层本身实现ConfirmCallbackService接口,简化书写操作
  • 分离成策略类则更利于模块化、解耦和扩展。
2、回调上下文: 策略分发器
  1. @Component
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class ConfirmCallbackContext {
  5.     /**
  6.      * 注入RabbitTemplate
  7.      */
  8.     private final RabbitTemplate rabbitTemplate;
  9.     /**
  10.      * 注入所有ConfirmCallbackService的实现类
  11.      * 在不同的业务场景调用不同的实现来处理投递失败的业务逻辑
  12.      */
  13.     private final Map<String,ConfirmCallbackService> confirmCallbackServiceMap;
  14.     /**
  15.      * 统一调用回调处理
  16.      * 在容器初始化就执行这个方法
  17.      */
  18.     @PostConstruct
  19.     public void confirmCallback(){
  20.         rabbitTemplate.setConfirmCallback((cdata,ack,cause)->{
  21.             ReturnedMessage returnedMessage = cdata.getReturned();
  22.             if(ack){
  23.                 log.info("The message was delivered to the{}",returnedMessage);
  24.             }else{
  25.                 //获取业务实现的bean的id
  26.                 String beanName = returnedMessage.getReplyText();
  27.                 //根据bean的名称从map中获取相应的实现类
  28.                 ConfirmCallbackService callbackService = confirmCallbackServiceMap.get(beanName);
  29.                 callbackService.confirmCallback(returnedMessage.getMessage());
  30.             }
  31.         });
  32.     }
  33. }
复制代码
核心原理:

  • Spring Boot 会自动扫描所有实现 ConfirmCallbackService 的 Bean
  • Bean 名称作为 key,Bean 实例作为 value 注入到 Map
  • ConfirmCallbackContext 根据 replyText 动态找到对应的策略实现类
3.消息发送端封装
  1. @Component
  2. @RequiredArgsConstructor
  3. public class RabbitManager<T> {
  4.     private final RabbitTemplate rabbitTemplate;
  5.     public void send(String exchange,String routingKey,
  6.                      String callbackBeanName,T data){
  7.         try {
  8.             //创建cdata对象并设置一个id
  9.             CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  10.             //将投递的数据转换为byte[]
  11.             byte[] bytes = new ObjectMapper().writeValueAsBytes(data);
  12.             //将bytes封装为Message对象
  13.             Message message = new Message(bytes);
  14.             //创建一个投递失败时返回的消息对象
  15.             ReturnedMessage returnedMessage = new ReturnedMessage(message, 0,
  16.                     callbackBeanName, exchange,routingKey);
  17.             //将ReturnedMesssage保存到cdata中
  18.             correlationData.setReturned(returnedMessage);
  19.             //发送
  20.             rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData);
  21.         } catch (Exception e) {
  22.             throw new RuntimeException(e);
  23.         }
  24.     }
  25. }
复制代码
** 关键点:**
callbackBeanName 会被放进 replyText 中,作为“回调策略的指针”。
4.点赞业务逻辑方法
4.1简化写法
  1. @Override
  2.     public LikeDTO likeEssay(Integer uid, Integer eid) {
  3.         boolean likeStatus = false;
  4.         //如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞
  5.         if(isLike(eid, uid)) {
  6.             //将用户ID从set集合中移除
  7.             redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);
  8.         } else {
  9.             likeStatus = true;
  10.             //将用户ID添加到set集合中
  11.             redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);
  12.         }
  13.         //获取当前帖子在redis中的点赞总数
  14.         Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);
  15.         //创建LikeDTO封装修改的数据并发布到消息队列
  16.         LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);
  17.         //发送到mq异步更新到数据库
  18.         rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY,
  19.                 "likeServiceImpl", likeDTO);
  20.         return likeDTO;
  21.     }
  22.      /**
  23.      * 消息投递失败后的处理
  24.      * @param message 失败后返回的消息
  25.      */
  26.     @Override
  27.     public void confirmCallback(Message message) {
  28.         byte[] bytes = message.getBody();
  29.         try {
  30.             //反序列化为LikeDTO对象
  31.             LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);
  32.             //执行反向操作
  33.             if(dto.getLikeStatus()) {
  34.                 redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());
  35.             } else {
  36.                 redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());
  37.             }
  38.         } catch (IOException e) {
  39.             throw new RuntimeException(e);
  40.         }
  41.     }
复制代码
4.2 有业务实现类时
````
public LikeDTO likeEssay(Integer uid, Integer eid) {
boolean likeStatus = false;
  1.     //如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞
  2.     if(isLike(uid,eid)){
  3.         //取消点赞
  4.         redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());
  5.         likeMapper.deleteLike(eid,uid);
  6.     }else{
  7.         likeStatus = true;
  8.         //将用户ID添加到set集合中
  9.         redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());
  10.     }
  11.     //获取当前帖子在redis中的点赞总数
  12.     Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);
  13.     //创建LikeDTO封装修改的数据并发布到消息队列
  14.     LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);
  15.     //发送到mq异步更新到数据库
  16.     rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME,RabbitmqConfig.ROUTING_KEY,
  17.             "likeConfirmCallbackService",likeDTO);
  18.     return likeDTO;
  19. }
复制代码
  1. 最终目标:当点赞消息从生产者发送到 RabbitMQ 时,一旦投递失败,系统能自动执行反向补偿逻辑,确保 Redis 与数据库的一致性。
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册