找回密码
 立即注册
首页 业界区 业界 Flink学习笔记:时间与Watermark

Flink学习笔记:时间与Watermark

厂潺 昨天 23:20
在前文中,我学习 Flink 的整体架构,接下来的几篇文章,我将重点学习一下 Flink 的几个核心概念。包括时间属性、Watermark、窗口、状态以及容错机制。今天就来学习时间属性和 Watermark。
时间属性

首先来学习 Flink 的时间属性,作为流处理引擎,时间是实时数据处理的重要依赖,特别是在做时序分析或者特定时间段数据处理时,时间的概念更显得尤为重要。
Flink 中支持三种时间属性,分别是:

  • EventTime:事件时间,即为事件产生的时间。
  • ProcessTime:处理时间,Flink 算子处理事件的时间。
  • IngestionTime:摄入时间,Flink 读取事件的时间。
这样描述可能比较抽象,我们通过一张图来看一下。
1.png

从上图中可以看出,在时间产生/存储时,记录一个设备时间,就是 Event Time。当 Flink 的 DataSource 读取到事件时,这时再记录一个时间,这就是 Ingestion Time。在 Flink 程序中,每个算子处理事件时,又会记录一个时间,这个时间就是 Process Time。
Watermark

介绍完了时间概念,再来看下 Watermark 的概念。它是 Flink 处理迟到事件的妙招。
Watermark 本身也属于一种特殊的事件,它由 Source 生成,同时携带由 Timestamp,并且会跟随正常的事件一起在 Flink 算子之间流转。Watermark 的作用是定义何时停止等待较早的事件。这么介绍可能比较抽象,下面我们通过一些具体的例子来进行更进一步的说明。
2.png

上图代表的是一段乱序的事件数据流。假设我们定义 maxOutOfOrderness 为4,也就是容忍最大迟到时间为4(这里不带具体时间单位,可能是4秒也可能是4分钟)。当我们收到时间戳为7的事件时,就会生成一个时间为3的 Watermark。这代表着3之前的数据都已就绪。如果此时再有小于3的数据,我们认为它是迟到数据。
而对于迟到的数据,通常有三种处理方法:

  • 重新开启已经关闭的窗口,重新计算并修正结果
  • 将迟到事件使用旁路输出收集起来单独处理
  • 将迟到事件视为错误消息丢弃
在 Flink 中 Watermark 本身是没有意义的,它的主要作用是作为窗口的触发条件。窗口可以认为是一个时间段,它有开始时间和结束时间。在窗口内可以计算一批事件的统计结果。关于窗口,我们后面再做详细介绍。
那么 Watermark 是如何触发窗口的呢?答案是必须要满足以下两个条件:

  • Watermark 的时间戳 >= 窗口的 end_time
  • 窗口中有数据
从概念上看还是比较抽象,我们还用上面的数据流作为例子,Watermark 设置为最大时间减 4,假设我们设置10秒一个窗口。关键代码如下:
[code]***SingleOutputStreamOperator withTimestampsAndWatermarks = source                .assignTimestampsAndWatermarks(                        WatermarkStrategy.forGenerator(ctx -> new CustomWatermarkGenerator())                                .withTimestampAssigner(((event, l) -> event.timestamp))                );OutputTag lateTag = new OutputTag("late-tag") {};SingleOutputStreamOperator windowResult = withTimestampsAndWatermarks        .keyBy(event -> event.num)        .window(TumblingEventTimeWindows.of(Time.seconds(10)))        .sideOutputLateData(lateTag)        .process(new ProcessWindowFunction() {         @Override         public void process(Long key, Context context, Iterable elements, Collector out) {             // 一些逻辑处理             out.collect(result);         }});// 处理迟到数据DataStream lateStream = windowResult.getSideOutput(lateTag);lateStream.process(new ProcessFunction() {    @Override    public void processElement(Event event, Context ctx, Collector out) {        out.collect("迟到事件: " + event);    }}).print();******@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {    long eventTime = event.timestamp;    // 使用CAS确保线程安全    while (true) {        long current = currentMaxTime.get();        if (eventTime

相关推荐

您需要登录后才可以回帖 登录 | 立即注册