在前文中,我学习 Flink 的整体架构,接下来的几篇文章,我将重点学习一下 Flink 的几个核心概念。包括时间属性、Watermark、窗口、状态以及容错机制。今天就来学习时间属性和 Watermark。
时间属性
首先来学习 Flink 的时间属性,作为流处理引擎,时间是实时数据处理的重要依赖,特别是在做时序分析或者特定时间段数据处理时,时间的概念更显得尤为重要。
Flink 中支持三种时间属性,分别是:
- EventTime:事件时间,即为事件产生的时间。
- ProcessTime:处理时间,Flink 算子处理事件的时间。
- IngestionTime:摄入时间,Flink 读取事件的时间。
这样描述可能比较抽象,我们通过一张图来看一下。
从上图中可以看出,在时间产生/存储时,记录一个设备时间,就是 Event Time。当 Flink 的 DataSource 读取到事件时,这时再记录一个时间,这就是 Ingestion Time。在 Flink 程序中,每个算子处理事件时,又会记录一个时间,这个时间就是 Process Time。
Watermark
介绍完了时间概念,再来看下 Watermark 的概念。它是 Flink 处理迟到事件的妙招。
Watermark 本身也属于一种特殊的事件,它由 Source 生成,同时携带由 Timestamp,并且会跟随正常的事件一起在 Flink 算子之间流转。Watermark 的作用是定义何时停止等待较早的事件。这么介绍可能比较抽象,下面我们通过一些具体的例子来进行更进一步的说明。
上图代表的是一段乱序的事件数据流。假设我们定义 maxOutOfOrderness 为4,也就是容忍最大迟到时间为4(这里不带具体时间单位,可能是4秒也可能是4分钟)。当我们收到时间戳为7的事件时,就会生成一个时间为3的 Watermark。这代表着3之前的数据都已就绪。如果此时再有小于3的数据,我们认为它是迟到数据。
而对于迟到的数据,通常有三种处理方法:
- 重新开启已经关闭的窗口,重新计算并修正结果
- 将迟到事件使用旁路输出收集起来单独处理
- 将迟到事件视为错误消息丢弃
在 Flink 中 Watermark 本身是没有意义的,它的主要作用是作为窗口的触发条件。窗口可以认为是一个时间段,它有开始时间和结束时间。在窗口内可以计算一批事件的统计结果。关于窗口,我们后面再做详细介绍。
那么 Watermark 是如何触发窗口的呢?答案是必须要满足以下两个条件:
- Watermark 的时间戳 >= 窗口的 end_time
- 窗口中有数据
从概念上看还是比较抽象,我们还用上面的数据流作为例子,Watermark 设置为最大时间减 4,假设我们设置10秒一个窗口。关键代码如下:
[code]***SingleOutputStreamOperator withTimestampsAndWatermarks = source .assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator(ctx -> new CustomWatermarkGenerator()) .withTimestampAssigner(((event, l) -> event.timestamp)) );OutputTag lateTag = new OutputTag("late-tag") {};SingleOutputStreamOperator windowResult = withTimestampsAndWatermarks .keyBy(event -> event.num) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .sideOutputLateData(lateTag) .process(new ProcessWindowFunction() { @Override public void process(Long key, Context context, Iterable elements, Collector out) { // 一些逻辑处理 out.collect(result); }});// 处理迟到数据DataStream lateStream = windowResult.getSideOutput(lateTag);lateStream.process(new ProcessFunction() { @Override public void processElement(Event event, Context ctx, Collector out) { out.collect("迟到事件: " + event); }}).print();******@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) { long eventTime = event.timestamp; // 使用CAS确保线程安全 while (true) { long current = currentMaxTime.get(); if (eventTime |