找回密码
 立即注册
首页 业界区 业界 从零开始学Flink:数据输出的终极指南

从零开始学Flink:数据输出的终极指南

锷稠 前天 11:21
在实时数据处理的完整链路中,数据输出(Sink)是最后一个关键环节,它负责将处理后的结果传递到外部系统供后续使用。Flink提供了丰富的数据输出连接器,支持将数据写入Kafka、Elasticsearch、文件系统、数据库等各种目标系统。本文将深入探讨Flink数据输出的核心概念、配置方法和最佳实践,并基于Flink 1.20.1构建一个完整的数据输出案例。
一、Flink Sink概述

1. 什么是Sink

Sink(接收器)是Flink数据处理流水线的末端,负责将计算结果输出到外部存储系统或下游处理系统。在Flink的编程模型中,Sink是DataStream API中的一个转换操作,它接收DataStream并将数据写入指定的外部系统。
2. Sink的分类

Flink的Sink连接器可以分为以下几类:

  • 内置Sink:如print()、printToErr()等用于调试的内置输出
  • 文件系统Sink:支持写入本地文件系统、HDFS等
  • 消息队列Sink:如Kafka、RabbitMQ等
  • 数据库Sink:如JDBC、Elasticsearch等
  • 自定义Sink:通过实现SinkFunction接口自定义输出逻辑
3. 输出语义保证

Flink为Sink提供了三种输出语义保证:

  • 最多一次(At-most-once):数据可能丢失,但不会重复
  • 至少一次(At-least-once):数据不会丢失,但可能重复
  • 精确一次(Exactly-once):数据既不会丢失,也不会重复
这些语义保证与Flink的检查点(Checkpoint)机制密切相关,我们将在后面详细讨论。
二、环境准备与依赖配置

1. 版本说明


  • Flink:1.20.1
  • JDK:17+
  • Gradle:8.3+
  • 外部系统:Kafka 3.4.0、Elasticsearch 7.17.0、MySQL 8.0
2. 核心依赖
  1. dependencies {
  2.     // Flink核心依赖
  3.     implementation 'org.apache.flink:flink_core:1.20.1'
  4.     implementation 'org.apache.flink:flink-streaming-java:1.20.1'
  5.     implementation 'org.apache.flink:flink-clients:1.20.1'
  6.    
  7.     // Kafka Connector
  8.     implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'
  9.    
  10.     // Elasticsearch Connector
  11.     implementation 'org.apache.flink:flink-connector-elasticsearch7:3.1.0-1.20'
  12.    
  13.     // JDBC Connector
  14.     implementation 'org.apache.flink:flink-connector-jdbc:3.3.0-1.20'
  15.     implementation 'mysql:mysql-connector-java:8.0.33'
  16.    
  17.     // FileSystem Connector
  18.     implementation 'org.apache.flink:flink-connector-files:1.20.1'
  19. }
复制代码
三、基础Sink操作

1. 内置调试Sink

Flink提供了一些内置的Sink用于开发和调试阶段:
  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class BasicSinkDemo {
  4.     public static void main(String[] args) throws Exception {
  5.         // 创建执行环境
  6.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7.         
  8.         // 创建数据源
  9.         DataStream<String> stream = env.fromElements("Hello", "Flink", "Sink");
  10.         
  11.         // 打印到标准输出
  12.         stream.print("StandardOutput");
  13.         
  14.         // 打印到标准错误输出
  15.         stream.printToErr("ErrorOutput");
  16.         
  17.         // 执行作业
  18.         env.execute("Basic Sink Demo");
  19.     }
  20. }
复制代码
2. 文件系统Sink

Flink支持将数据写入本地文件系统、HDFS等。下面是一个写入本地文件系统的示例:
  1. package com.cn.daimajiangxin.flink.sink;
  2. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
  3. import org.apache.flink.configuration.MemorySize;
  4. import org.apache.flink.connector.file.sink.FileSink;
  5. import org.apache.flink.core.fs.Path;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
  9. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
  10. import java.time.Duration;
  11. public class FileSystemSinkDemo {
  12.     public static void main(String[] args) throws Exception {
  13.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.         DataStream<Object> stream = env.fromData("Hello", "Flink", "FileSystem", "Sink");
  15.         RollingPolicy<Object, String> rollingPolicy = DefaultRollingPolicy.<Object, String>builder()
  16.                 .withRolloverInterval(Duration.ofMinutes(15))
  17.                 .withInactivityInterval(Duration.ofMinutes(5))
  18.                 .withMaxPartSize(MemorySize.ofMebiBytes(64))
  19.                 .build();
  20.         // 创建文件系统Sink
  21.         FileSink<Object> sink = FileSink
  22.                 .forRowFormat(new Path("file:///tmp/flink-output"), new SimpleStringEncoder<>())
  23.                 .withRollingPolicy(rollingPolicy)
  24.                 .build();
  25.         // 添加Sink
  26.         stream.sinkTo(sink);
  27.         env.execute("File System Sink Demo");
  28.     }
  29. }
复制代码
四、高级Sink连接器

1. Kafka Sink

Kafka是实时数据处理中常用的消息队列,Flink提供了强大的Kafka Sink支持:
  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
  3. import org.apache.flink.connector.kafka.sink.KafkaSink;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import java.util.Properties;
  7. public class KafkaSinkDemo {
  8.     public static void main(String[] args) throws Exception {
  9.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10.         
  11.         // 开启检查点以支持Exactly-Once语义
  12.         env.enableCheckpointing(5000);
  13.         
  14.         DataStream<String> stream = env.fromElements("Hello Kafka", "Flink to Kafka", "Data Pipeline");
  15.         
  16.         // Kafka配置
  17.         Properties props = new Properties();
  18.         props.setProperty("bootstrap.servers", "localhost:9092");
  19.         
  20.         // 创建Kafka Sink
  21.         KafkaSink<String> sink = KafkaSink.<String>
  22.                 builder()
  23.                 .setKafkaProducerConfig(props)
  24.                 .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  25.                         .setTopic("flink-output-topic")
  26.                         .setValueSerializationSchema(new SimpleStringSchema())
  27.                         .build())
  28.                 .build();
  29.         
  30.         // 添加Sink
  31.         stream.sinkTo(sink);
  32.         
  33.         env.execute("Kafka Sink Demo");
  34.     }
  35. }
复制代码
kafka消息队列消息:
1.png

2. Elasticsearch Sink

Elasticsearch是一个实时的分布式搜索和分析引擎,非常适合存储和查询Flink处理的实时数据:
  1. package com.cn.daimajiangxin.flink.sink;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
  4. import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.http.HttpHost;
  8. import org.elasticsearch.action.index.IndexRequest;
  9. import org.elasticsearch.client.Requests;
  10. import java.util.Map;
  11. public class ElasticsearchSinkDemo {
  12.     private static final ObjectMapper objectMapper = new ObjectMapper();
  13.     public static void main(String[] args) throws Exception {
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.enableCheckpointing(5000);
  16.         DataStream<String> stream = env.fromData(
  17.                 "{"id":"1","name":"Flink","category":"framework"}",
  18.                 "{"id":"2","name":"Elasticsearch","category":"database"}");
  19.         // 配置Elasticsearch节点
  20.         HttpHost httpHost=new HttpHost("localhost", 9200, "http");
  21.         // 创建Elasticsearch Sink
  22.         ElasticsearchSink<String> sink=new Elasticsearch7SinkBuilder<String>()
  23.                 .setBulkFlushMaxActions(10)        // 批量操作数量
  24.                 .setBulkFlushInterval(5000)          // 批量刷新间隔(毫秒)
  25.                 .setHosts(httpHost)
  26.                 .setConnectionRequestTimeout(60000)  // 连接请求超时时间
  27.                 .setConnectionTimeout(60000)         // 连接超时时间
  28.                 .setSocketTimeout(60000)             // Socket 超时时间
  29.                 .setEmitter((element, context, indexer) -> {
  30.                     try {
  31.                         Map<String, Object> json = objectMapper.readValue(element, Map.class);
  32.                         IndexRequest request = Requests.indexRequest()
  33.                                 .index("flink_documents")
  34.                                 .id((String) json.get("id"))
  35.                                 .source(json);
  36.                         indexer.add(request);
  37.                     } catch (Exception e) {
  38.                         // 处理解析异常
  39.                         System.err.println("Failed to parse JSON: " + element);
  40.                     }
  41.                 })
  42.                 .build();
  43.         // 添加Sink
  44.         stream.sinkTo(sink);
  45.         env.execute("Elasticsearch Sink Demo");
  46.     }
  47. }
复制代码
使用post工具查看数据
2.png

3. JDBC Sink

使用JDBC Sink可以将数据写入各种关系型数据库:
  1. package com.cn.daimajiangxin.flink.sink;
  2. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  3. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  4. import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
  5. import org.apache.flink.connector.jdbc.core.datastream.Jdbc;
  6. import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
  7. import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. public class JdbcSinkDemo {
  13.     public static void main(String[] args) throws Exception {
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.enableCheckpointing(5000);
  16.         List<User> userList = Arrays.asList(     new User(1, "Alice", 25,"alice"),
  17.                 new User(2, "Bob", 30,"bob"),
  18.                 new User(3, "Charlie", 35,"charlie"));
  19.         // 模拟用户数据
  20.         DataStream<User> userStream = env.fromData(userList);
  21.         JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
  22.                 .withBatchSize(1000)
  23.                 .withBatchIntervalMs(200)
  24.                 .withMaxRetries(5)
  25.                 .build();
  26.         JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  27.                 .withUrl("jdbc:mysql://localhost:3306/test")
  28.                 .withDriverName("com.mysql.cj.jdbc.Driver")
  29.                 .withUsername("username")
  30.                 .withPassword("password")
  31.                 .build();
  32.         String insertSql = "INSERT INTO user (id, name, age, user_name) VALUES (?, ?, ?, ?)";
  33.         JdbcStatementBuilder<User> statementBuilder = (statement, user) -> {
  34.             statement.setInt(1, user.getId());
  35.             statement.setString(2, user.getName());
  36.             statement.setInt(3, user.getAge());
  37.             statement.setString(4, user.getUserName());
  38.         };
  39.         // 创建JDBC Sink
  40.         JdbcSink<User> jdbcSink = new Jdbc().<User>sinkBuilder()
  41.                 .withQueryStatement( new SimpleJdbcQueryStatement<User>(insertSql,statementBuilder))
  42.                 .withExecutionOptions(jdbcExecutionOptions)
  43.                 .buildAtLeastOnce(connectionOptions);
  44.         // 添加Sink
  45.         userStream.sinkTo(jdbcSink);
  46.         env.execute("JDBC Sink Demo");
  47.     }
  48.     // 用户实体类
  49.     public static class User {
  50.         private int id;
  51.         private String name;
  52.         private String userName;
  53.         private int age;
  54.         public User(int id, String name, int age,String userName) {
  55.             this.id = id;
  56.             this.name = name;
  57.             this.age = age;
  58.             this.userName=userName;
  59.         }
  60.         public int getId() {
  61.             return id;
  62.         }
  63.         public String getName() {
  64.             return name;
  65.         }
  66.         public int getAge() {
  67.             return age;
  68.         }
  69.         public String getUserName() {
  70.             return userName;
  71.         }
  72.     }
  73. }
复制代码
登录mysql客户端查看数据
3.png

五、Sink的可靠性保证机制

1. 检查点与保存点

Flink的检查点(Checkpoint)机制是实现精确一次语义的基础。当开启检查点后,Flink会定期将作业的状态保存到持久化存储中。如果作业失败,Flink可以从最近的检查点恢复,确保数据不会丢失。
  1. // 配置检查点
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // 启用检查点,间隔5000ms
  4. env.enableCheckpointing(5000);
  5. // 配置检查点模式为EXACTLY_ONCE(默认)
  6. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. // 设置检查点超时时间
  8. env.getCheckpointConfig().setCheckpointTimeout(60000);
  9. // 设置最大并行检查点数量
  10. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  11. // 开启外部化检查点,作业失败时保留检查点
  12. env.getCheckpointConfig().enableExternalizedCheckpoints(
  13.     CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
复制代码
2. 事务与二阶段提交

对于支持事务的外部系统,Flink使用二阶段提交(Two-Phase Commit)协议来实现精确一次语义:

  • 第一阶段(预提交):Flink将数据写入外部系统的预提交区域,但不提交
  • 第二阶段(提交):所有算子完成预提交后,Flink通知外部系统提交数据
这种机制确保了即使在作业失败或恢复的情况下,数据也不会被重复写入或丢失。
3. 不同Sink的语义保证级别

不同的Sink连接器支持不同级别的语义保证:

  • 支持精确一次(Exactly-once):Kafka、Elasticsearch(版本支持)、文件系统(预写日志模式)
  • 支持至少一次(At-least-once):JDBC、Redis、RabbitMQ
  • 最多一次(At-most-once):简单的无状态输出
六、自定义Sink实现

当Flink内置的Sink连接器不能满足需求时,我们可以通过实现SinkFunction接口来自定义Sink:
  1. package com.cn.daimajiangxin.flink.sink;
  2. import org.apache.flink.api.common.functions.RuntimeContext;
  3. import org.apache.flink.api.connector.sink2.Sink;
  4. import org.apache.flink.api.connector.sink2.SinkWriter;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  8. import java.io.IOException;
  9. public class CustomSinkDemo {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         DataStream<String> stream = env.fromElements("Custom", "Sink", "Example");
  13.         // 使用自定义Sink
  14.         stream.sinkTo(new CustomSink());
  15.         env.execute("Custom Sink Demo");
  16.     }
  17.     // 自定义Sink实现 - 使用新API
  18.     public static class CustomSink implements Sink<String> {
  19.         @Override
  20.         public SinkWriter<String> createWriter(InitContext context) {
  21.             return new CustomSinkWriter();
  22.         }
  23.         // SinkWriter负责实际的数据写入逻辑
  24.         private static class CustomSinkWriter implements SinkWriter<String> {
  25.             // 初始化资源
  26.             public CustomSinkWriter() {
  27.                 // 初始化连接、客户端等资源
  28.                 System.out.println("CustomSink initialized");
  29.             }
  30.             // 处理每个元素
  31.             @Override
  32.             public void write(String value, Context context)  throws IOException, InterruptedException {
  33.                 // 实际的写入逻辑
  34.                 System.out.println("Writing to custom sink: " + value);
  35.             }
  36.             // 刷新缓冲区
  37.             @Override
  38.             public void flush(boolean endOfInput) {
  39.                 // 刷新逻辑(如果需要)
  40.             }
  41.             // 清理资源
  42.             @Override
  43.             public void close() throws Exception {
  44.                 // 关闭连接、客户端等资源
  45.                 System.out.println("CustomSink closed");
  46.             }
  47.         }
  48.     }
  49. }
复制代码
4.png

七、实战案例:实时数据处理流水线

下面我们将构建一个完整的实时数据处理流水线,从Kafka读取数据,进行转换处理,然后输出到多个目标系统:
1. 系统架构
  1. Kafka Source -> Flink Processing -> Multiple Sinks
  2.                                |-> Kafka Sink
  3.                                |-> Elasticsearch Sink
  4.                                |-> JDBC Sink
复制代码
2. 数据模型

我们将使用日志数据模型,定义一个LogEntry类来表示日志条目:
  1. package com.cn.daimajiangxin.flink.sink;
  2. public class LogEntry {
  3.     private String timestamp;
  4.     private String logLevel;
  5.     private String source;
  6.     private String message;
  7.     public String getTimestamp() {
  8.         return timestamp;
  9.     }
  10.     public void setTimestamp(String timestamp) {
  11.         this.timestamp = timestamp;
  12.     }
  13.     public String getLogLevel() {
  14.         return logLevel;
  15.     }
  16.     public void setLogLevel(String logLevel) {
  17.         this.logLevel = logLevel;
  18.     }
  19.     public String getSource() {
  20.         return source;
  21.     }
  22.     public void setSource(String source) {
  23.         this.source = source;
  24.     }
  25.     public String getMessage() {
  26.         return message;
  27.     }
  28.     public void setMessage(String message) {
  29.         this.message = message;
  30.     }
  31.     @Override
  32.     public String toString() {
  33.         return String.format("LogEntry{timestamp='%s', logLevel='%s', source='%s', message='%s'}",
  34.                 timestamp, logLevel, source, message);
  35.     }
  36. }
复制代码
定义一个日志统计实体类LogStats,用于表示每个源的日志统计信息:
  1. package com.cn.daimajiangxin.flink.sink;
  2. public class LogStats {
  3.     private String source;
  4.     private long count;
  5.     public LogStats() {
  6.     }
  7.     public LogStats(String source, long count) {
  8.         this.source = source;
  9.         this.count = count;
  10.     }
  11.     public String getSource() {
  12.         return source;
  13.     }
  14.     public void setSource(String source) {
  15.         this.source = source;
  16.     }
  17.     public long getCount() {
  18.         return count;
  19.     }
  20.     public void setCount(long count) {
  21.         this.count = count;
  22.     }
  23.     @Override
  24.     public String toString() {
  25.         return String.format("LogStats{source='%s', count=%d}", source, count);
  26.     }
  27. }
复制代码
3. 完整实现代码
  1. package com.cn.daimajiangxin.flink.sink;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  5. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  6. import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
  7. import org.apache.flink.connector.jdbc.core.datastream.Jdbc;
  8. import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
  9. import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
  10. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
  11. import org.apache.flink.connector.kafka.sink.KafkaSink;
  12. import org.apache.flink.connector.kafka.source.KafkaSource;
  13. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  14. import org.apache.flink.streaming.api.datastream.DataStream;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
  17. import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
  18. import org.apache.http.HttpHost;
  19. import org.elasticsearch.action.index.IndexRequest;
  20. import org.elasticsearch.client.Requests;
  21. import java.sql.PreparedStatement;
  22. import java.time.LocalDateTime;
  23. import java.util.ArrayList;
  24. import java.util.HashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import java.util.Properties;
  28. public class MultiSinkPipeline {
  29.     public static void main(String[] args) throws Exception {
  30.         // 1. 创建执行环境并配置检查点
  31.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  32.         env.enableCheckpointing(5000);
  33.         // 2. 创建Kafka Source
  34.         KafkaSource<String> source = KafkaSource.<String>
  35.                         builder()
  36.                 .setBootstrapServers("localhost:9092")
  37.                 .setTopics("logs-input-topic")
  38.                 .setGroupId("flink-group")
  39.                 .setStartingOffsets(OffsetsInitializer.earliest())
  40.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  41.                 .build();
  42.         // 3. 读取数据并解析
  43.         DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  44.         // 解析日志数据
  45.         DataStream<LogEntry> logStream = kafkaStream
  46.                 .map(line -> {
  47.                     String[] parts = line.split("\\|");
  48.                     return new LogEntry(parts[0], parts[1], parts[2], parts[3]);
  49.                 })
  50.                 .name("Log Parser");
  51.         // 4. 过滤错误日志
  52.         DataStream<LogEntry> errorLogStream = logStream
  53.                 .filter(log -> "ERROR".equals(log.getLogLevel()))
  54.                 .name("Error Log Filter");
  55.         // 5. 配置并添加Kafka Sink - 输出错误日志
  56.         // Kafka配置
  57.         Properties props = new Properties();
  58.         props.setProperty("bootstrap.servers", "localhost:9092");
  59.         // 创建Kafka Sink
  60.         KafkaSink<LogEntry> kafkaSink = KafkaSink.<LogEntry>builder()
  61.                 .setKafkaProducerConfig(props)
  62.                 .setRecordSerializer(KafkaRecordSerializationSchema.<LogEntry>builder()
  63.                         .setTopic("error-logs-topic")
  64.                         .setValueSerializationSchema(element -> element.toString().getBytes())
  65.                         .build())
  66.                 .build();
  67.         errorLogStream.sinkTo(kafkaSink).name("Error Logs Kafka Sink");
  68.         // 6. 配置并添加Elasticsearch Sink - 存储所有日志
  69.         // 配置Elasticsearch节点
  70.         HttpHost httpHost=new HttpHost("localhost", 9200, "http");
  71.         ElasticsearchSink<LogEntry> esSink = new Elasticsearch7SinkBuilder<LogEntry>()
  72.                 .setBulkFlushMaxActions(10)        // 批量操作数量
  73.                 .setBulkFlushInterval(5000)          // 批量刷新间隔(毫秒)
  74.                 .setHosts(httpHost)
  75.                 .setConnectionRequestTimeout(60000)  // 连接请求超时时间
  76.                 .setConnectionTimeout(60000)         // 连接超时时间
  77.                 .setSocketTimeout(60000)             // Socket 超时时间
  78.                 .setEmitter((element, context, indexer) -> {
  79.                     Map<String, Object> json = new HashMap<>();
  80.                     json.put("timestamp", element.getTimestamp());
  81.                     json.put("logLevel", element.getLogLevel());
  82.                     json.put("source", element.getSource());
  83.                     json.put("message", element.getMessage());
  84.                     IndexRequest request = Requests.indexRequest()
  85.                             .index("logs_index")
  86.                             .source(json);
  87.                     indexer.add(request);
  88.                 })
  89.                 .build();
  90.         logStream.sinkTo(esSink).name("Elasticsearch Sink");
  91.         // 7. 配置并添加JDBC Sink - 存储错误日志统计
  92.         // 先进行统计
  93.         DataStream<LogStats> statsStream = errorLogStream
  94.                 .map(log -> new LogStats(log.getSource(), 1))
  95.                 .keyBy(LogStats::getSource)
  96.                 .sum("count")
  97.                 .name("Error Log Stats");
  98.         JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
  99.                 .withBatchSize(1000)
  100.                 .withBatchIntervalMs(200)
  101.                 .withMaxRetries(5)
  102.                 .build();
  103.         JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  104.                 .withUrl("jdbc:mysql://localhost:3306/test")
  105.                 .withDriverName("com.mysql.cj.jdbc.Driver")
  106.                 .withUsername("mysql用户名")
  107.                 .withPassword("mysql密码")
  108.                 .build();
  109.         String insertSql = "INSERT INTO error_log_stats (source, count, last_updated) VALUES (?, ?, ?) " +
  110.                "ON DUPLICATE KEY UPDATE count = count + VALUES(count), last_updated = VALUES(last_updated)";
  111.         JdbcStatementBuilder<LogStats> statementBuilder = (statement, stats) -> {
  112.             statement.setString(1, stats.getSource());
  113.             statement.setLong(2, stats.getCount());
  114.             statement.setTimestamp(3,  java.sql.Timestamp.valueOf(LocalDateTime.now()));
  115.         };
  116.         // 创建JDBC Sink
  117.         JdbcSink<LogStats> jdbcSink = new Jdbc().<LogStats>sinkBuilder()
  118.                 .withQueryStatement( new SimpleJdbcQueryStatement<LogStats>(insertSql,statementBuilder))
  119.                 .withExecutionOptions(jdbcExecutionOptions)
  120.                 .buildAtLeastOnce(connectionOptions);
  121.         statsStream.sinkTo(jdbcSink).name("JDBC Sink");
  122.         // 8. 执行作业
  123.         env.execute("Multi-Sink Data Pipeline");
  124.     }
  125. }
复制代码
4. 测试与验证

要测试这个完整的流水线,我们需要:

  • 启动Kafka并创建必要的主题:
    1. # 创建输入主题
    2. kafka-topics.sh --create --topic logs-input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    3. # 创建错误日志输出主题
    4. kafka-topics.sh --create --topic error-logs-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    复制代码
  • 启动Elasticsearch并确保服务正常运行
  • 在MySQL中创建必要的表:
    1. CREATE DATABASE test;
    2. USE test;
    3. CREATE TABLE error_log_stats (
    4.   source VARCHAR(100) PRIMARY KEY,
    5.   count BIGINT NOT NULL,
    6.   last_updated TIMESTAMP NOT NULL
    7. );
    复制代码
  • 向Kafka发送测试数据:
    1. kafka-console-producer.sh --topic logs-input-topic --bootstrap-server localhost:9092
    2. # 输入以下测试数据
    3. 2025-09-29 12:00:00|INFO|application-service|Application started successfully
    4. 2025-09-29 12:01:30|ERROR|database-service|Failed to connect to database
    5. 2025-09-29 12:02:15|WARN|cache-service|Cache eviction threshold reached
    6. 2025-09-29 12:03:00|ERROR|authentication-service|Invalid credentials detected
    复制代码
  • 运行Flink作业并观察数据流向各个目标系统
查看Kafka Sink中的数据:
5.png

查看MySQL中的数据:
6.png

查看Elasticsearch中的数据:
7.png

八、性能优化与最佳实践

1. 并行度配置

合理设置Sink的并行度可以显著提高吞吐量:
  1. // 为特定Sink设置并行度
  2. stream.addSink(sink).setParallelism(4);
  3. // 或为整个作业设置默认并行度
  4. env.setParallelism(4);
复制代码
2. 批处理配置

对于支持批处理的Sink,合理配置批处理参数可以减少网络开销:
  1. // JDBC批处理示例
  2. JdbcExecutionOptions.builder()
  3.     .withBatchSize(1000)  // 每批次处理的记录数
  4.     .withBatchIntervalMs(200)  // 批处理间隔
  5.     .withMaxRetries(3)  // 最大重试次数
  6.     .build();
复制代码
3. 背压处理

当Sink无法处理上游数据时,会产生背压。Flink提供了背压监控和处理机制:

  • 使用Flink Web UI监控背压情况
  • 考虑使用缓冲机制或调整并行度
  • 对于关键路径,实现自定义的背压处理逻辑
4. 资源管理

合理管理连接和资源是保证Sink稳定运行的关键:

  • 使用连接池管理数据库连接
  • 在RichSinkFunction的open()方法中初始化资源
  • 在close()方法中正确释放资源
5. 错误处理策略

为Sink配置适当的错误处理策略:
  1. // 重试策略配置
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3.     3,  // 最大重试次数
  4.     Time.of(10, TimeUnit.SECONDS)  // 重试间隔
  5. ));
复制代码
九、总结与展望

本文深入探讨了Flink数据输出(Sink)的核心概念、各种连接器的使用方法以及可靠性保证机制。我们学习了如何配置和使用内置Sink、文件系统Sink、Kafka Sink、Elasticsearch Sink和JDBC Sink,并通过自定义Sink扩展了Flink的输出能力。最后,我们构建了一个完整的实时数据处理流水线,将处理后的数据输出到多个目标系统。
在Flink的数据处理生态中,Sink是连接计算结果与外部世界的桥梁。通过选择合适的Sink连接器并配置正确的参数,我们可以构建高效、可靠的数据处理系统。
源文来自:http://blog.daimajiangxin.com.cn
源码地址:https://gitee.com/daimajiangxin/flink-learning

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册