一、背景
最近有一个需求是:要求有一个类对象为Order,它有string类型的字段orderNo和Long类型的字段cost,生产者写到rocketMQ的value是Order对象序列化后的字节数组、key值是orderNo字段,要求spark以自动提交的方式消费rocketMQ,并将数据依次写入到hive表中,并且spark中有一个5分钟滑动窗口,滑动步长为1分钟,统计5分钟内的cost总值并输出。
二、实战演练
1. Order 类- 1 package com.example;
- 2
- 3 import java.io.Serializable;
- 4
- 5 public class Order implements Serializable {
- 6 private static final long serialVersionUID = 1L;
- 7
- 8 private String orderNo;
- 9 private Long cost;
- 10
- 11 public Order() {}
- 12
- 13 public Order(String orderNo, Long cost) {
- 14 this.orderNo = orderNo;
- 15 this.cost = cost;
- 16 }
- 17
- 18 public String getOrderNo() {
- 19 return orderNo;
- 20 }
- 21
- 22 public void setOrderNo(String orderNo) {
- 23 this.orderNo = orderNo;
- 24 }
- 25
- 26 public Long getCost() {
- 27 return cost;
- 28 }
- 29
- 30 public void setCost(Long cost) {
- 31 this.cost = cost;
- 32 }
- 33
- 34 @Override
- 35 public String toString() {
- 36 return "Order{" +
- 37 "orderNo='" + orderNo + '\'' +
- 38 ", cost=" + cost +
- 39 '}';
- 40 }
- 41 }
复制代码 2. RocketMQ Producer 示例- 1 package com.example;
- 2
- 3 import org.apache.rocketmq.client.producer.DefaultMQProducer;
- 4 import org.apache.rocketmq.client.producer.SendResult;
- 5 import org.apache.rocketmq.common.message.Message;
- 6
- 7 import java.io.ByteArrayOutputStream;
- 8 import java.io.ObjectOutputStream;
- 9 import java.util.UUID;
- 10
- 11 public class RocketMQOrderProducer {
- 12 public static void main(String[] args) throws Exception {
- 13 DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
- 14 producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址
- 15 producer.start();
- 16
- 17 for (int i = 0; i < 20; i++) {
- 18 String orderNo = "ORD-" + UUID.randomUUID().toString().substring(0, 8);
- 19 long cost = (long) (Math.random() * 1000);
- 20 Order order = new Order(orderNo, cost);
- 21
- 22 byte[] body;
- 23 try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- 24 ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- 25 oos.writeObject(order);
- 26 body = bos.toByteArray();
- 27 }
- 28
- 29 Message msg = new Message("OrderTopic", orderNo, body);
- 30 SendResult result = producer.send(msg);
- 31 System.out.printf("Sent: %s, result=%s%n", order, result);
- 32 }
- 33
- 34 producer.shutdown();
- 35 }
- 36 }
复制代码 3. Hive 建表 SQL- -- 原始订单表
- DROP TABLE IF EXISTS demo_db.orders;
- CREATE TABLE demo_db.orders (
- orderNo STRING,
- cost BIGINT
- )
- STORED AS PARQUET;
- -- 聚合结果表
- DROP TABLE IF EXISTS demo_db.order_cost_window_agg;
- CREATE TABLE demo_db.order_cost_window_agg (
- window_start TIMESTAMP,
- window_end TIMESTAMP,
- total_cost BIGINT
- )
- STORED AS PARQUET;
复制代码 4. Spark Structured Streaming 消费 RocketMQ & 写入 Hive
这里用 rocketmq-spark connector,Structured Streaming 模式下,offset 会自动 checkpoint。- 1 package com.example;
- 2
- 3 import org.apache.spark.sql.*;
- 4 import org.apache.spark.sql.streaming.StreamingQuery;
- 5 import org.apache.spark.sql.streaming.Trigger;
- 6 import org.apache.spark.sql.types.DataTypes;
- 7
- 8 import java.io.ByteArrayInputStream;
- 9 import java.io.ObjectInputStream;
- 10
- 11 public class StructuredStreamingRocketMQToHive {
- 12 public static void main(String[] args) throws Exception {
- 13 SparkSession spark = SparkSession.builder()
- 14 .appName("StructuredStreamingRocketMQToHive")
- 15 .enableHiveSupport()
- 16 .getOrCreate();
- 17
- 18 // 注册反序列化 UDF
- 19 spark.udf().register("deserializeOrderNo", (byte[] bytes) -> {
- 20 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
- 21 Order o = (Order) ois.readObject();
- 22 return o.getOrderNo();
- 23 } catch (Exception e) {
- 24 return null;
- 25 }
- 26 }, DataTypes.StringType);
- 27
- 28 spark.udf().register("deserializeCost", (byte[] bytes) -> {
- 29 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
- 30 Order o = (Order) ois.readObject();
- 31 return o.getCost();
- 32 } catch (Exception e) {
- 33 return null;
- 34 }
- 35 }, DataTypes.LongType);
- 36
- 37 // 读取 RocketMQ 流
- 38 Dataset<Row> df = spark.readStream()
- 39 .format("org.apache.rocketmq.spark")
- 40 .option("namesrvAddr", "localhost:9876")
- 41 .option("consumerGroup", "order_consumer_group")
- 42 .option("topics", "OrderTopic")
- 43 .load();
- 44
- 45 // RocketMQ Connector 输出的 DataFrame 包含 key、body、topic、offset 等字段
- 46 Dataset<Row> orderDF = df.withColumn("orderNo", functions.callUDF("deserializeOrderNo", df.col("body")))
- 47 .withColumn("cost", functions.callUDF("deserializeCost", df.col("body")))
- 48 .select("orderNo", "cost")
- 49 .filter("orderNo IS NOT NULL AND cost IS NOT NULL");
- 50
- 51 // 写入 Hive 原始订单表
- 52 StreamingQuery orderQuery = orderDF.writeStream()
- 53 .format("hive")
- 54 .outputMode("append")
- 55 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders")
- 56 .toTable("demo_db.orders");
- 57
- 58 // 窗口统计(处理时间窗口)
- 59 Dataset<Row> aggDF = orderDF
- 60 .withColumn("event_time", functions.current_timestamp())
- 61 .groupBy(functions.window(functions.col("event_time"), "5 minutes", "1 minute"))
- 62 .agg(functions.sum("cost").alias("total_cost"))
- 63 .selectExpr("window.start as window_start", "window.end as window_end", "total_cost");
- 64
- 65 // 写入 Hive 聚合表
- 66 StreamingQuery aggQuery = aggDF.writeStream()
- 67 .format("hive")
- 68 .outputMode("append")
- 69 .option("checkpointLocation", "/user/hive/warehouse/checkpoints/orders_agg")
- 70 .toTable("demo_db.order_cost_window_agg");
- 71
- 72 spark.streams().awaitAnyTermination();
- 73 }
- 74 }
复制代码 5. 关键点
- Structured Streaming 自动管理 offset,checkpoint 存在 HDFS/S3,恢复时会从 checkpoint 继续。
- outputMode("append") + Hive sink,需要 Spark 开启 enableHiveSupport()。
- 这里为了简单,用的是 处理时间窗口(current_timestamp()),如果你想改成 事件时间窗口,需要在 Order 类里加 ts 字段并解析出来,再配合 withWatermark() 使用。
转发请注明出处:https://www.cnblogs.com/fnlingnzb-learner/p/19073626
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |