找回密码
 立即注册
首页 业界区 业界 RocketMQ+Spring Boot的简单实现及其深入分析 ...

RocketMQ+Spring Boot的简单实现及其深入分析

愿隙 昨天 00:40
Producer搭建


  • 导入RocketMQ依赖和配置RocketMQ地址及producer的group:name
  1.         <dependency>
  2.             <groupId>org.apache.rocketmq</groupId>
  3.             rocketmq-spring-boot-starter</artifactId>
  4.             <version>2.3.4</version>
  5.         </dependency>
复制代码
1.png


  • 创建消费接口
2.png
  1. 1. 调用接口进行测试
复制代码
3.png
  1.     ## 发送消息模式的类型扩展
  2.    
  3.     > `RocketTemplate`中有许多发送方法,其可应对大多数的场景
  4.     >
复制代码
4.png
  1.     ### syncSend()
  2.    
  3.     > 同步发送,仅当发送过程完成时返回此方法.需严格保证顺序性,其会阻塞调用线程至到Broker获取响应
  4.     >
  5.     - 参数列表
  6.         - `destination`目标主题,格式为`topicName:tags`tags可选
  7.         - `payload`消息体,可以是任意对象,自动序列化
  8.         - `message`Spring Message对象,可自定义headers
  9.         - `timeout`发送超时时间毫秒,默认3000ms
  10.     - 返回对象
  11.         - SendResult:包含消息ID,发送状态,队列偏移值等等
  12.     - 用于大部分对发送结果严格的场景:如电商,金融等等
  13.    
  14.     ### asyncSend()
  15.    
  16.     > 异步发送,没有返回对象.异步传输一般用于响应时间敏感的业务场景.在发送完成会立即调用其参数列表中的sendCallBack方法
  17.     >
  18.     - 参数列表
  19.         - `String destination`
  20.         - `Message<?> message`
  21.         - `SendCallback sendCallback`:发送结果调用方法
  22.             - `onSuccess(SendResult result)`:发送成功回调
  23.             - `onException(Throwable e)`:发送失败回调
  24.     - 适用于:高吞吐,但对结果要求不高的场景如日志采集等等
复制代码
5.png
  1.     ### syncSendOrderly()
  2.    
  3.     > 顺序发送
  4.     >
  5.     - 参数列表
  6.         - `SendResult syncSendOrderly(String destination, Message<?> message, String hashKey);`
  7.         - `SendResult syncSendOrderly(String destination, Object payload, String hashKey);`
  8.         - `hasyKey`:分片见,相同的hashKey会被路由到同一个队列
  9.             - 基本原理:`int queueId = Math.abs(hashKey.hashCode()) % queueCount;`
  10.    
  11.     # SendMesssageInTransaction()
  12.    
  13.     > 发送MQ分布式事务消息,其采用2PC(两端式协议)+补偿机制(事务回查)的分布式事务功能
  14.     >
  15.     - 半事务消息:暂不能投递的消息,消息生产者已经成功将消息发送到RocketMQ服务器中,但暂时为收到生产者对消息的二次确认.此时的消息会被标记为”暂不能投递”的状态.处于这种”暂不能投递”状态的消息被称为半事务消息
  16.     - 消息回查:由于一些网络问题,生产者自身的问题等等,导致某条事务消息二次丢失,RocketMQ通过扫描某条消息长期处于”半事务消息”时,其会向生产者组询查该消息的最终状态(commit或Rollback),这就是消息回查
  17.     - 在RocketMQ中发送食物消息需要三个核心组件
  18.         1. 事务消息发送:使用sendMessageInTransaction()方法
复制代码
6.png
  1.         2. 事务监听器:实现RocketMQLocalTransactionListener接口
  2.         3. 事务监听注册:通过@RocketMQTransactionListener注解注册
复制代码
7.png
  1.             - 返回对象:`TransactionSendResult`:含事务状态`LocalTransactionState`
  2.         - 采用这一套事务消息发送逻辑,本地的Service只需关心发送消息的逻辑,其余的事务逻辑交由给事务监听器处理
复制代码
8.png
  1.     ### 事务基本执行流程
  2.    
  3.     - **第一阶段:发送半事务**
  4.         1. 生产者发送半事务消息:生产者将业务数据封装成数据,并将其发送给RocketMQ,此时消息被标记为”半事务消息”
  5.         2. RocketMQ确认接收消息:RocketMQ接收到消息并将其持久化到存储系统中,此时会向生产者发送一个确认消息(Ack)表示该消息已经被接收
  6.         3. 生产者执行本地事务逻辑:生产者接收到服务端的确认后,则开始本地业务逻辑执行.如更新数据库,修改订单等等
  7.     - **第二阶段:提交或回滚事务**
  8.         1. 生产者提交二次确认结果:根据本地事务执行结果,生产者向RocketMQ提交二次确认结果
  9.             1. 若本地事务执行成功:生产者提交`Commit`操作,服务器端将半事务标记为:”可投递状态”,并将其投递给消费者
  10.             2. 如果本地事务执行失败:生产者提交`Rollback`操作,RocketMQ则会回滚,不会将消息投递给消费者
  11.         2. 但0由于网络问题生产者自身应用问题导致重启,RocketMQ迟迟未收到生产者的二次确认,或收到的消息结果为`Unknown`未知状态.RocketMQ会发起事务回查.
  12.             1. RocketMQ会向生产者发送回查请求,要求查询其本地事务状态
  13.             2. 生产者根据本地事务状态再次提交二次确认结果
  14.     - **第三阶段:消费者进行消费**
  15.         1. 当RocketMQ中的消息被标记为”可投递”之后,消息会被投递到消费者.消费者按其消费逻辑进行消费操作.最后向RocketMQ发送消费结果(成功/失败)
  16.         2. 消息被消费后,RocketMQ会标记其消息为”已消费”,RocketMQ会默认保留所有消息.支持消费者回溯历史消息
复制代码
9.png
  1.     ### 幂等问题
  2.    
  3.     > 幂等性,值对同一操作多次执行,结果与仅执行一次效果相同
  4.     >
  5.     - 出现幂等的原因:
  6.         1. **生产者重复发送**:生产者客户端有可能因为某些网络问题导致发送失败,届时生产者会尝试发送相同的消息从而会导致RocketMQ重复消费
  7.         2. **重试机制**:RocketMQ提供了消息重试机制,在消息发送中出现异常时.消费者会重新拉取相同的消息进行重试.若消费者方没有处理幂等性,则消息会被重复消费
  8.         3. **集群下的消息重复消费**:在RocketMQ下的集群,如果多个消费者订阅相同的主题,且每个消费者都独立消费消息,那么同一个消息就会被不同的消费者组重复消费
  9.    
  10.     ### 使用Redssion实现幂等性
  11.    
  12.     ```java
  13.      consumer.registerMessageListener(new MessageListenerConcurrently() {
  14.                 @Override
  15.                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  16.                     for (MessageExt msg : msgs) {
  17.                         String msgId = msg.getMsgId();
  18.                         String lockKey = "rocketmq:msg:" + msgId;
  19.                         RLock lock = redissonClient.getLock(lockKey);
  20.                         boolean acquired = false;
  21.    
  22.                         try {
  23.                             acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);
  24.                             if (acquired) {
  25.                                 System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);
  26.                                 Thread.sleep(100); // 模拟业务处理
  27.                                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  28.                             } else {
  29.                                 System.out.println("Duplicate message skipped: " + msgId);
  30.                                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  31.                             }
  32.                         } catch (Exception e) {
  33.                             System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());
  34.                             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  35.                         } finally {
  36.                             if (acquired) {
  37.                                 lock.unlock();
  38.                             }
  39.                         }
  40.                     }
  41.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  42.                 }
  43.             });
  44.     ```
  45.    
  46.     ```java
  47.     consumer.registerMessageListener(new MessageListenerOrderly() {
  48.         @Override
  49.         public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  50.             for (MessageExt msg : msgs) {
  51.                 String msgId = msg.getMsgId();
  52.                 String lockKey = "rocketmq:msg:" + msgId;
  53.                 RLock lock = redissonClient.getLock(lockKey);
  54.                 boolean acquired = false;
  55.    
  56.                 try {
  57.                     acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);
  58.                     if (acquired) {
  59.                         System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);
  60.                         Thread.sleep(100);
  61.                         return ConsumeOrderlyStatus.SUCCESS;
  62.                     } else {
  63.                         System.out.println("Duplicate message skipped: " + msgId);
  64.                         return ConsumeOrderlyStatus.SUCCESS;
  65.                     }
  66.                 } catch (Exception e) {
  67.                     System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());
  68.                     return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  69.                 } finally {
  70.                     if (acquired) {
  71.                         lock.unlock();
  72.                     }
  73.                 }
  74.             }
  75.             return ConsumeOrderlyStatus.SUCCESS;
  76.         }
  77.     });
  78.     ```
  79.    
  80.     ### sendAndReceive()
  81.    
  82.     - 用于实现请求-响应模式的核心方法,其允许在分布式系统中实现类似RCP同步通信的能力
复制代码
10.png
  1.     - 核心特性
  2.         - 同步通信:阻塞调用线程直到收到响应
  3.         - 双向交互:实现生产者与消费者的双向通信
  4.         - 解耦设计:保持MQ解耦特性同时实现同步交互
  5.     - 参数列表
  6.         
  7.         Message<?> sendAndReceive(`String destination,Message<?> requestMessage,long timeout`) throws MessagingException
  8.         
  9.     - 业务场景:实时查询库存信息
复制代码
Consumer搭建


  • 引入依赖配置consumer的group:name
  1.         <dependency>
  2.             <groupId>org.apache.rocketmq</groupId>
  3.             rocketmq-spring-boot-starter</artifactId>
  4.             <version>2.3.4</version>
  5.         </dependency>
复制代码
11.png


  • 创建消息监听器
实现RocketMQListener接口,重写其onMessage()方法完成消费逻辑


  • 使用@RocketMQMessageListener(consumerGroup=””,topic=””)注解:来指定消费者组,及目标topic
12.png


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册