找回密码
 立即注册
首页 业界区 安全 Structured Streaming消费rocketMQ

Structured Streaming消费rocketMQ

拍棹 2025-9-30 11:46:40
一、背景

最近有一个需求是:要求有一个类对象为Order,它有string类型的字段orderNo和Long类型的字段cost,生产者写到rocketMQ的value是Order对象序列化后的字节数组、key值是orderNo字段,要求spark以自动提交的方式消费rocketMQ,并将数据依次写入到hive表中,并且spark中有一个5分钟滑动窗口,滑动步长为1分钟,统计5分钟内的cost总值并输出。
 
二、实战演练

1. Order 类
  1. 1 package com.example;
  2. 2
  3. 3 import java.io.Serializable;
  4. 4
  5. 5 public class Order implements Serializable {
  6. 6     private static final long serialVersionUID = 1L;
  7. 7
  8. 8     private String orderNo;
  9. 9     private Long cost;
  10. 10
  11. 11     public Order() {}
  12. 12
  13. 13     public Order(String orderNo, Long cost) {
  14. 14         this.orderNo = orderNo;
  15. 15         this.cost = cost;
  16. 16     }
  17. 17
  18. 18     public String getOrderNo() {
  19. 19         return orderNo;
  20. 20     }
  21. 21
  22. 22     public void setOrderNo(String orderNo) {
  23. 23         this.orderNo = orderNo;
  24. 24     }
  25. 25
  26. 26     public Long getCost() {
  27. 27         return cost;
  28. 28     }
  29. 29
  30. 30     public void setCost(Long cost) {
  31. 31         this.cost = cost;
  32. 32     }
  33. 33
  34. 34     @Override
  35. 35     public String toString() {
  36. 36         return "Order{" +
  37. 37                 "orderNo='" + orderNo + '\'' +
  38. 38                 ", cost=" + cost +
  39. 39                 '}';
  40. 40     }
  41. 41 }
复制代码
2. RocketMQ Producer 示例
  1. 1 package com.example;
  2. 2
  3. 3 import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. 4 import org.apache.rocketmq.client.producer.SendResult;
  5. 5 import org.apache.rocketmq.common.message.Message;
  6. 6
  7. 7 import java.io.ByteArrayOutputStream;
  8. 8 import java.io.ObjectOutputStream;
  9. 9 import java.util.UUID;
  10. 10
  11. 11 public class RocketMQOrderProducer {
  12. 12     public static void main(String[] args) throws Exception {
  13. 13         DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
  14. 14         producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址
  15. 15         producer.start();
  16. 16
  17. 17         for (int i = 0; i < 20; i++) {
  18. 18             String orderNo = "ORD-" + UUID.randomUUID().toString().substring(0, 8);
  19. 19             long cost = (long) (Math.random() * 1000);
  20. 20             Order order = new Order(orderNo, cost);
  21. 21
  22. 22             byte[] body;
  23. 23             try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
  24. 24                  ObjectOutputStream oos = new ObjectOutputStream(bos)) {
  25. 25                 oos.writeObject(order);
  26. 26                 body = bos.toByteArray();
  27. 27             }
  28. 28
  29. 29             Message msg = new Message("OrderTopic", orderNo, body);
  30. 30             SendResult result = producer.send(msg);
  31. 31             System.out.printf("Sent: %s, result=%s%n", order, result);
  32. 32         }
  33. 33
  34. 34         producer.shutdown();
  35. 35     }
  36. 36 }
复制代码
3. Hive 建表 SQL
  1. -- 原始订单表
  2. DROP TABLE IF EXISTS demo_db.orders;
  3. CREATE TABLE demo_db.orders (
  4.   orderNo STRING,
  5.   cost BIGINT
  6. )
  7. STORED AS PARQUET;
  8. -- 聚合结果表
  9. DROP TABLE IF EXISTS demo_db.order_cost_window_agg;
  10. CREATE TABLE demo_db.order_cost_window_agg (
  11.   window_start TIMESTAMP,
  12.   window_end TIMESTAMP,
  13.   total_cost BIGINT
  14. )
  15. STORED AS PARQUET;
复制代码
4. Spark Structured Streaming 消费 RocketMQ & 写入 Hive
这里用 rocketmq-spark connector,Structured Streaming 模式下,offset 会自动 checkpoint。
  1. 1 package com.example;
  2. 2
  3. 3 import org.apache.spark.sql.*;
  4. 4 import org.apache.spark.sql.streaming.StreamingQuery;
  5. 5 import org.apache.spark.sql.streaming.Trigger;
  6. 6 import org.apache.spark.sql.types.DataTypes;
  7. 7
  8. 8 import java.io.ByteArrayInputStream;
  9. 9 import java.io.ObjectInputStream;
  10. 10
  11. 11 public class StructuredStreamingRocketMQToHive {
  12. 12     public static void main(String[] args) throws Exception {
  13. 13         SparkSession spark = SparkSession.builder()
  14. 14                 .appName("StructuredStreamingRocketMQToHive")
  15. 15                 .enableHiveSupport()
  16. 16                 .getOrCreate();
  17. 17
  18. 18         // 注册反序列化 UDF
  19. 19         spark.udf().register("deserializeOrderNo", (byte[] bytes) -> {
  20. 20             try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
  21. 21                 Order o = (Order) ois.readObject();
  22. 22                 return o.getOrderNo();
  23. 23             } catch (Exception e) {
  24. 24                 return null;
  25. 25             }
  26. 26         }, DataTypes.StringType);
  27. 27
  28. 28         spark.udf().register("deserializeCost", (byte[] bytes) -> {
  29. 29             try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
  30. 30                 Order o = (Order) ois.readObject();
  31. 31                 return o.getCost();
  32. 32             } catch (Exception e) {
  33. 33                 return null;
  34. 34             }
  35. 35         }, DataTypes.LongType);
  36. 36
  37. 37         // 读取 RocketMQ 流
  38. 38         Dataset<Row> df = spark.readStream()
  39. 39                 .format("org.apache.rocketmq.spark")
  40. 40                 .option("namesrvAddr", "localhost:9876")
  41. 41                 .option("consumerGroup", "order_consumer_group")
  42. 42                 .option("topics", "OrderTopic")
  43. 43                 .load();
  44. 44
  45. 45         // RocketMQ Connector 输出的 DataFrame 包含 key、body、topic、offset 等字段
  46. 46         Dataset<Row> orderDF = df.withColumn("orderNo", functions.callUDF("deserializeOrderNo", df.col("body")))
  47. 47                 .withColumn("cost", functions.callUDF("deserializeCost", df.col("body")))
  48. 48                 .select("orderNo", "cost")
  49. 49                 .filter("orderNo IS NOT NULL AND cost IS NOT NULL");
  50. 50
  51. 51         // 写入 Hive 原始订单表
  52. 52         StreamingQuery orderQuery = orderDF.writeStream()
  53. 53                 .format("hive")
  54. 54                 .outputMode("append")
  55. 55                 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders")
  56. 56                 .toTable("demo_db.orders");
  57. 57
  58. 58         // 窗口统计(处理时间窗口)
  59. 59         Dataset<Row> aggDF = orderDF
  60. 60                 .withColumn("event_time", functions.current_timestamp())
  61. 61                 .groupBy(functions.window(functions.col("event_time"), "5 minutes", "1 minute"))
  62. 62                 .agg(functions.sum("cost").alias("total_cost"))
  63. 63                 .selectExpr("window.start as window_start", "window.end as window_end", "total_cost");
  64. 64
  65. 65         // 写入 Hive 聚合表
  66. 66         StreamingQuery aggQuery = aggDF.writeStream()
  67. 67                 .format("hive")
  68. 68                 .outputMode("append")
  69. 69                 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders_agg")
  70. 70                 .toTable("demo_db.order_cost_window_agg");
  71. 71
  72. 72         spark.streams().awaitAnyTermination();
  73. 73     }
  74. 74 }
复制代码
5. 关键点

  • Structured Streaming 自动管理 offset,checkpoint 存在 HDFS/S3,恢复时会从 checkpoint 继续。
  • outputMode("append") + Hive sink,需要 Spark 开启 enableHiveSupport()。
  • 这里为了简单,用的是 处理时间窗口(current_timestamp()),如果你想改成 事件时间窗口,需要在 Order 类里加 ts 字段并解析出来,再配合 withWatermark() 使用。
 转发请注明出处:https://www.cnblogs.com/fnlingnzb-learner/p/19073626

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册