找回密码
 立即注册
首页 业界区 业界 我工作中用MQ的10种场景

我工作中用MQ的10种场景

空娅芬 昨天 11:10
前言

最近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?
记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?
直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的价值。
今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。
一、为什么需要消息队列(MQ)?

在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?
系统间的直接调用:
1.png

引入消息队列后:
2.png

接下来我们将通过10个具体场景,带大家来深入理解MQ的价值。
场景一:系统解耦

背景描述

在我早期参与的一个电商项目中,订单创建后需要通知多个系统:
  1. // 早期的紧耦合设计
  2. public class OrderService {
  3.     private InventoryService inventoryService;
  4.     private PointsService pointsService;
  5.     private EmailService emailService;
  6.     private AnalyticsService analyticsService;
  7.    
  8.     public void createOrder(Order order) {
  9.         // 1. 保存订单
  10.         orderDao.save(order);
  11.         
  12.         // 2. 调用库存服务
  13.         inventoryService.updateInventory(order);
  14.         
  15.         // 3. 调用积分服务
  16.         pointsService.addPoints(order.getUserId(), order.getAmount());
  17.         
  18.         // 4. 发送邮件通知
  19.         emailService.sendOrderConfirmation(order);
  20.         
  21.         // 5. 记录分析数据
  22.         analyticsService.trackOrderCreated(order);
  23.         
  24.         // 更多服务...
  25.     }
  26. }
复制代码
这种架构存在严重问题:

  • 紧耦合:订单服务需要知道所有下游服务
  • 单点故障:任何一个下游服务挂掉都会导致订单创建失败
  • 性能瓶颈:同步调用导致响应时间慢
MQ解决方案

引入MQ后,架构变为:
3.png

代码实现
  1. // 订单服务 - 生产者
  2. @Service
  3. public class OrderService {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.    
  7.     public void createOrder(Order order) {
  8.         // 1. 保存订单
  9.         orderDao.save(order);
  10.         
  11.         // 2. 发送消息到MQ
  12.         rabbitTemplate.convertAndSend(
  13.             "order.exchange",
  14.             "order.created",
  15.             new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
  16.         );
  17.     }
  18. }
  19. // 库存服务 - 消费者
  20. @Component
  21. @RabbitListener(queues = "inventory.queue")
  22. public class InventoryConsumer {
  23.     @Autowired
  24.     private InventoryService inventoryService;
  25.    
  26.     @RabbitHandler
  27.     public void handleOrderCreated(OrderCreatedEvent event) {
  28.         inventoryService.updateInventory(event.getOrderId());
  29.     }
  30. }
复制代码
技术要点


  • 消息协议选择:根据业务需求选择RabbitMQ、Kafka或RocketMQ
  • 消息格式:使用JSON或Protobuf等跨语言格式
  • 错误处理:实现重试机制和死信队列
场景二:异步处理

背景描述

用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作,如果同步处理,用户需要等待很长时间。
MQ解决方案
  1. // 视频服务 - 生产者
  2. @Service
  3. public class VideoService {
  4.     @Autowired
  5.     private KafkaTemplate<String, Object> kafkaTemplate;
  6.    
  7.     public UploadResponse uploadVideo(MultipartFile file, String userId) {
  8.         // 1. 保存原始视频
  9.         String videoId = saveOriginalVideo(file);
  10.         
  11.         // 2. 发送处理消息
  12.         kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
  13.         
  14.         // 3. 立即返回响应
  15.         return new UploadResponse(videoId, "upload_success");
  16.     }
  17. }
  18. // 视频处理服务 - 消费者
  19. @Service
  20. public class VideoProcessingConsumer {
  21.     @KafkaListener(topics = "video-processing")
  22.     public void processVideo(VideoProcessingEvent event) {
  23.         // 异步执行耗时操作
  24.         videoProcessor.transcode(event.getVideoId());
  25.         videoProcessor.generateThumbnails(event.getVideoId());
  26.         contentModerationService.checkContent(event.getVideoId());
  27.         
  28.         // 发送处理完成通知
  29.         notificationService.notifyUser(event.getUserId(), event.getVideoId());
  30.     }
  31. }
复制代码
架构优势


  • 快速响应:用户上传后立即得到响应
  • 弹性扩展:可以根据处理压力动态调整消费者数量
  • 故障隔离:处理服务故障不会影响上传功能
场景三:流量削峰

背景描述

电商秒杀活动时,瞬时流量可能是平时的百倍以上,直接冲击数据库和服务。
MQ解决方案

4.png

代码实现
  1. // 秒杀服务
  2. @Service
  3. public class SecKillService {
  4.     @Autowired
  5.     private RedisTemplate<String, Object> redisTemplate;
  6.    
  7.     @Autowired
  8.     private RabbitTemplate rabbitTemplate;
  9.    
  10.     public SecKillResponse secKill(SecKillRequest request) {
  11.         // 1. 校验用户资格
  12.         if (!checkUserQualification(request.getUserId())) {
  13.             return SecKillResponse.failed("用户无资格");
  14.         }
  15.         
  16.         // 2. 预减库存(Redis原子操作)
  17.         Long remaining = redisTemplate.opsForValue().decrement(
  18.             "sec_kill_stock:" + request.getItemId());
  19.         
  20.         if (remaining == null || remaining < 0) {
  21.             // 库存不足,恢复库存
  22.             redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
  23.             return SecKillResponse.failed("库存不足");
  24.         }
  25.         
  26.         // 3. 发送秒杀成功消息到MQ
  27.         rabbitTemplate.convertAndSend(
  28.             "sec_kill.exchange",
  29.             "sec_kill.success",
  30.             new SecKillSuccessEvent(request.getUserId(), request.getItemId())
  31.         );
  32.         
  33.         return SecKillResponse.success("秒杀成功");
  34.     }
  35. }
  36. // 订单处理消费者
  37. @Component
  38. @RabbitListener(queues = "sec_kill.order.queue")
  39. public class SecKillOrderConsumer {
  40.     @RabbitHandler
  41.     public void handleSecKillSuccess(SecKillSuccessEvent event) {
  42.         // 异步创建订单
  43.         orderService.createSecKillOrder(event.getUserId(), event.getItemId());
  44.     }
  45. }
复制代码
技术要点


  • 库存预扣:使用Redis原子操作避免超卖
  • 队列缓冲:MQ缓冲请求,避免直接冲击数据库
  • 限流控制:在网关层进行限流,拒绝过多请求
场景四:数据同步

背景描述

在微服务架构中,不同服务有自己的数据库,需要保证数据一致性。
MQ解决方案
  1. // 用户服务 - 数据变更时发送消息
  2. @Service
  3. public class UserService {
  4.     @Transactional
  5.     public User updateUser(User user) {
  6.         // 1. 更新数据库
  7.         userDao.update(user);
  8.         
  9.         // 2. 发送消息(在事务内)
  10.         rocketMQTemplate.sendMessageInTransaction(
  11.             "user-update-topic",
  12.             MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
  13.                 .build(),
  14.             null
  15.         );
  16.         
  17.         return user;
  18.     }
  19. }
  20. // 其他服务 - 消费用户更新消息
  21. @Service
  22. @RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
  23. public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
  24.     @Override
  25.     public void onMessage(UserUpdateEvent event) {
  26.         // 更新本地用户信息缓存
  27.         orderService.updateUserCache(event.getUserId(), event.getStatus());
  28.     }
  29. }
复制代码
一致性保证


  • 本地事务表:将消息和业务数据放在同一个数据库事务中
  • 事务消息:使用RocketMQ的事务消息机制
  • 幂等消费:消费者实现幂等性,避免重复处理
场景五:日志收集

背景描述

分布式系统中,日志分散在各个节点,需要集中收集和分析。
MQ解决方案

5.png

代码实现
  1. // 日志收集组件
  2. @Component
  3. public class LogCollector {
  4.     @Autowired
  5.     private KafkaTemplate<String, String> kafkaTemplate;
  6.    
  7.     public void collectLog(String appId, String level, String message, Map<String, Object> context) {
  8.         LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
  9.         
  10.         // 发送到Kafka
  11.         kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
  12.     }
  13. }
  14. // 日志消费者
  15. @Service
  16. public class LogConsumer {
  17.     @KafkaListener(topics = "app-logs", groupId = "log-es")
  18.     public void consumeLog(String message) {
  19.         LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
  20.         
  21.         // 存储到Elasticsearch
  22.         elasticsearchService.indexLog(logEvent);
  23.         
  24.         // 实时监控检查
  25.         if ("ERROR".equals(logEvent.getLevel())) {
  26.             alertService.checkAndAlert(logEvent);
  27.         }
  28.     }
  29. }
复制代码
技术优势


  • 解耦:应用节点无需关心日志如何处理
  • 缓冲:应对日志产生速率波动
  • 多消费:同一份日志可以被多个消费者处理
场景六:消息广播

背景描述

系统配置更新后,需要通知所有服务节点更新本地配置。
MQ解决方案
  1. // 配置服务 - 广播配置更新
  2. @Service
  3. public class ConfigService {
  4.     @Autowired
  5.     private RedisTemplate<String, Object> redisTemplate;
  6.    
  7.     public void updateConfig(String configKey, String configValue) {
  8.         // 1. 更新配置存储
  9.         configDao.updateConfig(configKey, configValue);
  10.         
  11.         // 2. 广播配置更新消息
  12.         redisTemplate.convertAndSend("config-update-channel",
  13.             new ConfigUpdateEvent(configKey, configValue));
  14.     }
  15. }
  16. // 服务节点 - 订阅配置更新
  17. @Component
  18. public class ConfigUpdateListener {
  19.     @Autowired
  20.     private LocalConfigCache localConfigCache;
  21.    
  22.     @RedisListener(channel = "config-update-channel")
  23.     public void handleConfigUpdate(ConfigUpdateEvent event) {
  24.         // 更新本地配置缓存
  25.         localConfigCache.updateConfig(event.getKey(), event.getValue());
  26.     }
  27. }
复制代码
应用场景


  • 功能开关:动态开启或关闭功能
  • 参数调整:调整超时时间、限流阈值等
  • 黑白名单:更新黑白名单配置
场景七:顺序消息

背景描述

在某些业务场景中,消息的处理顺序很重要,如订单状态变更。
MQ解决方案
  1. // 订单状态变更服务
  2. @Service
  3. public class OrderStateService {
  4.     @Autowired
  5.     private RocketMQTemplate rocketMQTemplate;
  6.    
  7.     public void changeOrderState(String orderId, String oldState, String newState) {
  8.         OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
  9.         
  10.         // 发送顺序消息,使用orderId作为sharding key
  11.         rocketMQTemplate.syncSendOrderly(
  12.             "order-state-topic",
  13.             event,
  14.             orderId  // 保证同一订单的消息按顺序处理
  15.         );
  16.     }
  17. }
  18. // 订单状态消费者
  19. @Service
  20. @RocketMQMessageListener(
  21.     topic = "order-state-topic",
  22.     consumerGroup = "order-state-group",
  23.     consumeMode = ConsumeMode.ORDERLY  // 顺序消费
  24. )
  25. public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
  26.     @Override
  27.     public void onMessage(OrderStateEvent event) {
  28.         // 按顺序处理订单状态变更
  29.         orderService.processStateChange(event);
  30.     }
  31. }
复制代码
顺序保证机制


  • 分区顺序:同一分区内的消息保证顺序
  • 顺序投递:MQ保证消息按发送顺序投递
  • 顺序处理:消费者顺序处理消息
场景八:延迟消息

背景描述

需要实现定时任务,如订单超时未支付自动取消。
MQ解决方案
  1. // 订单服务 - 发送延迟消息
  2. @Service
  3. public class OrderService {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.    
  7.     public void createOrder(Order order) {
  8.         // 保存订单
  9.         orderDao.save(order);
  10.         
  11.         // 发送延迟消息,30分钟后检查支付状态
  12.         rabbitTemplate.convertAndSend(
  13.             "order.delay.exchange",
  14.             "order.create",
  15.             new OrderCreateEvent(order.getId()),
  16.             message -> {
  17.                 message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟
  18.                 return message;
  19.             }
  20.         );
  21.     }
  22. }
  23. // 订单超时检查消费者
  24. @Component
  25. @RabbitListener(queues = "order.delay.queue")
  26. public class OrderTimeoutConsumer {
  27.     @RabbitHandler
  28.     public void checkOrderPayment(OrderCreateEvent event) {
  29.         Order order = orderDao.findById(event.getOrderId());
  30.         if ("UNPAID".equals(order.getStatus())) {
  31.             // 超时未支付,取消订单
  32.             orderService.cancelOrder(order.getId(), "超时未支付");
  33.         }
  34.     }
  35. }
复制代码
替代方案对比

方案优点缺点数据库轮询实现简单实时性差,数据库压力大延时队列实时性好实现复杂,消息堆积问题定时任务可控性强分布式协调复杂场景九:消息重试

背景描述

处理消息时可能遇到临时故障,需要重试机制保证最终处理成功。
MQ解决方案
  1. // 消息消费者 with 重试机制
  2. @Service
  3. @Slf4j
  4. public class RetryableConsumer {
  5.     @Autowired
  6.     private RabbitTemplate rabbitTemplate;
  7.    
  8.     @RabbitListener(queues = "business.queue")
  9.     public void processMessage(Message message, Channel channel) {
  10.         try {
  11.             // 业务处理
  12.             businessService.process(message);
  13.             
  14.             // 确认消息
  15.             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  16.             
  17.         } catch (TemporaryException e) {
  18.             // 临时异常,重试
  19.             log.warn("处理失败,准备重试", e);
  20.             
  21.             // 拒绝消息,requeue=true
  22.             channel.basicNack(
  23.                 message.getMessageProperties().getDeliveryTag(),
  24.                 false,
  25.                 true  // 重新入队
  26.             );
  27.             
  28.         } catch (PermanentException e) {
  29.             // 永久异常,进入死信队列
  30.             log.error("处理失败,进入死信队列", e);
  31.             
  32.             channel.basicNack(
  33.                 message.getMessageProperties().getDeliveryTag(),
  34.                 false,
  35.                 false  // 不重新入队
  36.             );
  37.         }
  38.     }
  39. }
复制代码
重试策略


  • 立即重试:临时故障立即重试
  • 延迟重试:逐步增加重试间隔
  • 死信队列:最终无法处理的消息进入死信队列
场景十:事务消息

背景描述

分布式系统中,需要保证多个服务的数据一致性。
MQ解决方案
  1. // 事务消息生产者
  2. @Service
  3. public class TransactionalMessageService {
  4.     @Autowired
  5.     private RocketMQTemplate rocketMQTemplate;
  6.    
  7.     @Transactional
  8.     public void createOrderWithTransaction(Order order) {
  9.         // 1. 保存订单(数据库事务)
  10.         orderDao.save(order);
  11.         
  12.         // 2. 发送事务消息
  13.         TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
  14.             "order-tx-topic",
  15.             MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
  16.                 .build(),
  17.             order  // 事务参数
  18.         );
  19.         
  20.         if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
  21.             throw new RuntimeException("事务消息发送失败");
  22.         }
  23.     }
  24. }
  25. // 事务消息监听器
  26. @Component
  27. @RocketMQTransactionListener
  28. public class OrderTransactionListener implements RocketMQLocalTransactionListener {
  29.     @Autowired
  30.     private OrderDao orderDao;
  31.    
  32.     @Override
  33.     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  34.         try {
  35.             // 检查本地事务状态
  36.             Order order = (Order) arg;
  37.             Order existOrder = orderDao.findById(order.getId());
  38.             
  39.             if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
  40.                 return RocketMQLocalTransactionState.COMMIT_MESSAGE;
  41.             } else {
  42.                 return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
  43.             }
  44.         } catch (Exception e) {
  45.             return RocketMQLocalTransactionState.UNKNOWN;
  46.         }
  47.     }
  48.    
  49.     @Override
  50.     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  51.         // 回查本地事务状态
  52.         String orderId = (String) msg.getHeaders().get("order_id");
  53.         Order order = orderDao.findById(orderId);
  54.         
  55.         if (order != null && "CREATED".equals(order.getStatus())) {
  56.             return RocketMQLocalTransactionState.COMMIT_MESSAGE;
  57.         } else {
  58.             return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
  59.         }
  60.     }
  61. }
复制代码
事务消息流程

6.png

总结

通过以上10个场景,我们可以总结出MQ使用的核心原则:
适用场景


  • 异步处理:提升系统响应速度
  • 系统解耦:降低系统间依赖
  • 流量削峰:应对突发流量
  • 数据同步:保证最终一致性
  • 分布式事务:解决数据一致性问题
技术选型建议

场景推荐MQ原因高吞吐Kafka高吞吐量,持久化存储事务消息RocketMQ完整的事务消息机制复杂路由RabbitMQ灵活的路由配置延迟消息RabbitMQ原生支持延迟队列最佳实践


  • 消息幂等性:消费者必须实现幂等处理
  • 死信队列:处理失败的消息要有兜底方案
  • 监控告警:完善的消息堆积监控和告警
  • 性能优化:根据业务特点调整MQ参数
最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。
本文收录于我的技术网站:http://www.susan.net.cn

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册