找回密码
 立即注册
首页 业界区 安全 [Flink/序列化/泛型] Flink DataStream 的类型系统(`Typ ...

[Flink/序列化/泛型] Flink DataStream 的类型系统(`TypeInformation`)

列蜜瘘 昨天 23:27
0 序 :缘起——最近遇到的类型系统转换问题


  • 最近,在将一个业务上较为重要的、原来由 Flink Sql 编写的 Flink Job 重写,改为基于 Flink Api 实现。
但改造过程中遇到不少 Flink 类型系统转换方面的问题,索性总结一二。


  • InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
Tuple arity : 元组元数
  1. public class DeviceRemoteConfigVersionMapFunction extends RichMapFunction< Tuple3<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode>, Tuple4<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode, DeviceRemoteConfigVersion>> implements ResultTypeQueryable {
  2.     @Override
  3.     public TypeInformation getProducedType() {
  4.         return Types.TUPLE(
  5.             TypeInformation.of( CdcPrimaryKeys.class ), //如缺少1个 class,比如此行,则报错:InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
  6.             TypeInformation.of( DeviceAlarmRecord.class ),
  7.             TypeInformation.of( DimFaultCode.class ),
  8.             TypeInformation.of( DeviceRemoteConfigVersion.class )
  9.         );
  10.     }
  11.        
  12.         //...
  13. }
  14. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
  15.         at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1436) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  16.         at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  17.         at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:151) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  18.         at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:581) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  19.         at com.xxx.bd.dataintegration.flinkapi.entry.xxx.DwdDeviceAlarmRecordRi.main(DwdDeviceAlarmRecordRi.java:106) ~[?:?]
  20.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_422]
  21.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_422]
  22.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_422]
  23.         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_422]
  24.         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:381) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  25.         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:232) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  26.         at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  27.         at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:343) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
  28.         ... 12 more
复制代码
  1. public class DimFaultCodeMapFunction extends RichMapFunction< Tuple2<CdcPrimaryKeys, DeviceAlarmRecord>, Tuple3<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode>> implements ResultTypeQueryable {
  2.     @Override
  3.     public TypeInformation getProducedType() {
  4.         return Types.TUPLE(
  5.             TypeInformation.of( CdcPrimaryKeys.class ),//如果不填这个,将报错: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.apache.flink.api.java.tuple.Tuple3
  6.             TypeInformation.of( DeviceAlarmRecord.class ),
  7.             TypeInformation.of( DimFaultCode.class )
  8.         );
  9.     }
  10.        
  11.         //...
  12. }
复制代码

  • Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2 pass a 'new TypeHint(){}'.
  1. public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
  2.     @Override
  3.     public TypeInformation getProducedType() {
  4.         return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );
  5.         //return TypeInformation.of( Tuple2.class );//[错误示范] 将报错: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
  6.         //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
  7.     }
  8.        
  9.         //...
  10. }
复制代码

  • TypeHint 的 T : 不支持泛型
  1. public class MysqlCdcDeserializationSchema implements DebeziumDeserializationSchema<CdcRecord> {
  2.     /**
  3.      * 获取返回类型
  4.      * @reference-doc
  5.      *  [1] {@link com.ververica.cdc.connectors.mysql.source.MySqlSource # deserializer(DebeziumDeserializationSchema<T> deserializer) 要求 }
  6.      * @return
  7.      */
  8.     @Override
  9.     public TypeInformation<CdcRecord> getProducedType() {
  10.         //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
  11.         //return Types.GENERIC(CdcRecord.class);
  12.         return Types.POJO( CdcRecord.class );
  13.         //return TypeInformation.of( CdcRecord.class );
  14.         //return BasicTypeInfo.of( CdcRecord.class );
  15.         //return BasicTypeInfo.STRING_TYPE_INFO;
  16.     }
  17.        
  18.         //...
  19. }
复制代码
1 概述:Flink DataStream 的类型系统

Flink 类型系统的由来


  • Flink DataStream 应用程序所处理的【事件】以数据对象的形式存在。


  • 函数调用时会传入数据对象,同时也可以输出数据对象
因此,Flink 在内部需要能够处理这些不同数据结构的对象


  • 当通过【网络传输】或者读写【状态后端】、【检查点】以及【保存点】时,需要对它们进行【序列化】【反序列化】
为了能够更高效地做到这一点,Flink 框架需要详细了解应用程序处理的数据类型


  • Flink 使用【类型信息】(org.apache.flink.api.common.typeinfo.TypeInformation)的概念来表示数据类型,并为每种数据类型生成特定的序列化器、反序列化器及比较器
  • 此外,Flink 还有一个类型提取系统,可以分析函数的输入和返回类型来自动获取类型信息,进而获得序列化器反序列化器
但是,在某些情况下,例如使用了 Lambda 函数或者泛型类型,必须显式提供【类型信息】才能使应用程序正常工作或者提高其性能。


  • 在本文中,我们会讨论:


  • Flink 支持的数据类型
  • 如何为数据类型创建类型信息
  • 如何在 Flink 的类型系统无法自动推断函数的返回类型时提供提示
最后简单说明一下显示指定类型信息的两个场景。
Flink 类型系统的常见应用场景: 注册子类型、注册自定义序列化器、添加类型提示、手动创建 TypeInformation


  • 注册子类型:如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让 Flink 了解这些子类型会大大提高性能。可以在 StreamExecutionEnvironment 或 ExecutionEnvironment 中调用 .registertype (clazz) 注册子类型信息。
  • 注册自定义序列化:对于不适用于自己的序列化框架的数据类型,Flink 会使用 Kryo 来进行序列化,并不是所有的类型都与 Kryo 无缝连接,具体注册方法在下文介绍。
  • 添加类型提示:有时,当 Flink 用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示 TypeHint,这个通常只在 Java API 中需要。
  • 手动创建一个 TypeInformation:在某些 API 调用中,这可能是必需的,因为 Java 的泛型类型擦除导致 Flink 无法推断数据类型。
例如:
  1. //样例: OutDto 为数据类型不定的泛型
  2. public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
  3.         //...
  4.     @Override
  5.     public TypeInformation getProducedType() {
  6.         return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
  7.         //return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[错误示范] TypeHint<T> 的 T : 不支持泛型,将报错: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
  8.         //return TypeInformation.of( Tuple2.class );//[错误示范] 将报错: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
  9.         //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
  10.     }
  11. }
复制代码
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为 Flink 已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。
Flink DataStream 类型系统的核心API


  • org.apache.flink.api.common.typeinfo.TypeInformation
  • org.apache.flink.api.java.typeutils.ResultTypeQueryable
  • Types
  • TypeHint
  • ...
2 数据类型


  • Flink 支持 Java 和 Scala 所有常见的数据类型,也不需要像 Hadoop 一样去实现一个特定的接口(org.apache.hadoop.io.Writable),能够自动识别数据类型
使用最多的可以分为如下几类,如下图所示:
1.png

从图中可以看到 Flink 类型可以分为基本类型、数组类型、复合类型、辅助类型以及泛型。
2.1 基本类型


  • Flink 能够支持所有 Java 和 Scala 原生基本类型(包装类型)以及 Void、String、Date、BigDecimal、BigInteger 等类型。
例如: 通过从给定的元素集中创建 DataStream 数据集:
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // 创建 Integer 类型的数据集
  3. DataStream<Integer> integerElements = env.fromElements(1, 2, 3);
  4. // 创建 String 类型的数据集
  5. DataStream<String> stringElements = env.fromElements("1", "2", "3");
复制代码
2.2 数组类型


  • 数组类型包含两种类型:


  • 基本类型数组:基本类型的 Java 数组,支持 boolean、byte、short、int、long、float 等
  • 对象数组:Object 类型的 Java 数组,支持 String 以及其他对象
例如: 通过从给定的元素集中创建 DataStream 数据集:
  1. int[] a = {1, 2};
  2. int[] b = {3, 4};
  3. DataStream<int[]> arrayElements = env.fromElements(a, b);
复制代码
2.3 复合数据类型

2.3.1 Java Tuples 类型


  • Flink 在 Java 接口中定义了元组类(Tuple)供用户使用。
元组是由固定数量的强类型字段组成的复合数据类型。
如下代码所示,创建 Tuple 数据类型数据集:
  1. DataStream<Tuple2> tupleElements = env.fromElements(new Tuple2(1, "a"), new Tuple2(2, "b"));
复制代码
Flink 提供了 Java 元组的高效实现,最多包含 25 个字段,每个字段长度都对应一个单独的实现,即 Tuple0 到 Tuple25。如果字段数量超过上限,可以通过继承 Tuple 类的方式进行拓展。
2.3.2 Scala Case Class 与 Tuple 类型


  • Flink 支持任意的 Scala Case Class 以及 Scala tuples 类型,支持的字段数量上限为 22,支持通过字段名称和位置索引获取指标,不支持存储空值。
例如:代码实例所示,定义 WordCount Case Class 数据类型,然后通过 fromElements 方法创建 input 数据集,调用 keyBy() 方法对数据集根据 word 字段重新分区。
  1. // 定义WordCount Case Class数据结构
  2. case class WordCount(word: String, count: Int)
  3. // 通过fromElements方法创建数据集
  4. val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
  5. val keyStream1 = input.keyBy("word") // 根据word字段为分区字段,
  6. val keyStream2 = input.keyBy(0) //也可以通过指定position分区
复制代码

  • 通过使用 Scala Tuple 创建 DataStream 数据集,其他的使用方式和 Case Class 相似。
需要注意的是,如果根据名称获取字段,可以使用 Tuple 中的默认字段名称
  1. // 通过 scala Tuple 创建具有两个元素的数据集
  2. val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),("c", 2))
  3. // 使用默认字段名称获取字段,其中 _1 表示 tuple 的第一个字段
  4. tupleStream.keyBy("_1")
复制代码
2.3.3 ROW 类型


  • Row 是一种固定长度、可识别空值的复合类型,以确定的字段顺序存储多个值。
每个字段的类型都可以不一样并且每个字段都可以为空。
由于无法自动推断行字段的类型,因此在生成 Row 时都需要提供类型信息。
如下代码所示,创建 Row 数据类型数据集:
  1. DataStream<Row> rowElements = env.fromElements(Row.of(0, "a", 3.14));
复制代码
2.3.4 POJO 类型


  • Flink 会分析那些不属于任何一类的数据类型,尝试将它们作为 POJO 类型进行处理。如果一个类型满足如下条件,Flink 就会将它们作为 POJO 数据类型:


  • POJOs 类必须是一个公有类,Public 修饰且独立定义,不能是内部类
  • POJOs 类中必须包含一个 Public 修饰的无参构造器;
  • POJOs 类中所有的字段必须是 Public 或者具有 Public 修饰的 getter 和 setter 方法;
  • POJOs 类中的字段类型必须是 Flink 支持的。
例如,如下 Java 类就会被 Flink 识别为 POJO:
  1. // (1) 必须是 Public 修饰且必须独立定义,不能是内部类
  2. public class Person {
  3.     // (4) 字段类型必须是 Flink 支持的
  4.     private String name;
  5.     private int age;
  6.     // (2) 必须包含一个 Public 修饰的无参构造器
  7.     public Person() {
  8.        
  9.     }
  10.    
  11.         public Person(String name, int age) {
  12.         this.name = name;
  13.         this.age = age;
  14.     }
  15.    
  16.         // (3) 所有的字段必须是 Public 或者具有 Public 修饰的 getter 和 setter 方法
  17.     public String getName() {
  18.         return name;
  19.     }
  20.    
  21.         public void setName(String name) {
  22.         this.name = name;
  23.     }
  24.    
  25.         public int getAge() {
  26.         return age;
  27.     }
  28.    
  29.         public void setAge(int age) {
  30.         this.age = age;
  31.     }
  32. }
复制代码

  • 定义好 POJOs Class 后,就可以在 Flink 环境中使用了,如下代码所示,使用 fromElements 接口构建 Person 类的数据集:
  1. env.fromElements(new Person("Lucy", 18), new Person("Tom", 12))
复制代码
2.4 辅助类型


  • 在 Flink 中也支持一些比较特殊的数据数据类型,例如 Scala 中的 List、Map、Either、Option、Try 数据类型,以及 Java 中 Either 数据类型,还有 Hadoop 的 Writable 数据类型。
如下代码所示,创建 List 类型数据集:
  1. DataStream> listElements = env.fromElements(
  2.     Lists.newArrayList(1, 2), Lists.newArrayList(3, 4)
  3. );
复制代码
这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像 POJOs 类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助 Types Hint 帮助 Flink 推断数据类型信息。
2.5 泛型类型


  • 那些无法特别处理的类型会被当做泛型类型处理并交给 Kryo 序列化框架进行序列化。
如果可能的话,尽可能的避免使用 Kryo
Kryo 作为一个通用的序列化框架,通常效率不高
3 TypeInformation


  • 那这么多的数据类型,在 Flink 内部又是如何表示的呢?
在 Flink 中每一个具体的类型都对应了一个具体的 TypeInformation 实现类。
例如,BasicTypeInformation 中的 IntegerTypeInformation 对应了 Integer 数据类型。


  • 数据类型的描述信息都是由 TypeInformation 定义。
比较常用的 TypeInformation 有 BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo 类等,如下图所示:
2.png


  • TypeInformation 为系统提供生成序列化器和比较器提供必要的信息。


  • 当应用程序提交执行时,Flink 的类型系统会尝试为处理的每种数据类型自动推断 TypeInformation。
  • 类型提取器会分析函数的泛型类型以及返回类型,来获取相应的 TypeInformation 对象。


  • 但是,有时类型提取器会失灵,或者你可能想定义自己的类型并告诉 Flink 如何有效地处理它们。在这种情况下,你需要为特定数据类型生成 TypeInformation
  • 除了对类型的描述之外,TypeInformation 还提供了序列化的支撑。每一个 TypeInformation 都会为对应的具体数据类型提供一个专属的序列化器
TypeInformation 会提供一个 createSerialize() 方法,通过这个方法就可以得到该类型进行数据序列化操作与反序列化操作的序列化器 TypeSerializer:
  1. public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
  2.     return this.serializer;
  3. }
复制代码

  • 对于大多数的数据类型, Flink 可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化。
比如,BasicTypeInfo、WritableTypeIno 等


  • 但针对 GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。
其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。
在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
4 显式指定 TypeInformation


  • 大多数情况下,Flink 可以自动推断类型生成正确的 TypeInformation,并选择合适的序列化器比较器
  • Flink 的类型提取器利用反射机制来分析函数签名以及子类信息,生成函数的正确输出类型。
  • 但是有时无法提取必要的信息,例如定义函数时如果使用到了泛型,JVM 就会出现类型擦除的问题,使得 Flink 并不能很容易地获取到数据集中的数据类型信息。这时候可能会抛出如下类似的异常:
  1. Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ReturnsExample.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
  2.         at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
  3.         at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1236)
  4.         at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
  5. ...
  6. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
  7. ...
复制代码

  • 此外,在某些情况下,Flink 选择的 TypeInformation 可能无法生成【最有效的序列化器】和【反序列化器】。
因此,你可能需要为你使用的数据类型显式地提供 TypeInformation。


  • 我们首先看一下如何创建 TypeInformation,然后再看一下如何为函数指定 TypeInformation。
4.1 创建 TypeInformation

4.1.1 of 方法


  • 对于非泛型的类型,可以使用 TypeInformation 的 of(Class typeClass) 函数直接传入 Class 就可以创建 TypeInformation:
  1. // 示例1 非泛型类型 直接传入 Class 对象
  2. DataStream<WordCount> result1 = env.fromElements("a b a")
  3.         .flatMap((String value, Collector<WordCount> out) -> {
  4.             for(String word : value.split("\\s")) {
  5.                 out.collect(new WordCount(word, 1));
  6.             }
  7.         })
  8.         .returns(TypeInformation.of(WordCount.class));
  9. result1.print("R1");
复制代码
上述方法仅适用于非泛型类型。


  • 如果是泛型类型,可以借助 TypeHint 为泛型类型创建 TypeInformation:
  1. // 示例2 泛型类型(如: <String, Integer>) 需要借助 TypeHint
  2. DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
  3.         .map(value -> Tuple2.of(value, 1))
  4.         .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  5. result2.print("R2");
复制代码
4.1.2 TypeHint


  • 对于泛型类型,上面是通过 TypeInformation.of + TypeHint 来创建 TypeInformation,也可以单独使用 TypeHint 来创建 TypeInformation:
  1. DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
  2.         .map(value -> Tuple2.of(value, 1))
  3.         .returns(new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo());
  4. result2.print("R2");
复制代码

  • TypeHint 的原理是在内部创建匿名子类,捕获泛型信息并会将其保留到运行时。运行时 TypeExtractor 可以获取保存的实际类型。
完整示例: https://github.com/sjf0115/data-example/blob/master/flink-example/src/main/java/com/flink/example/stream/base/typeInformation/hints/TypeHintExample.java
4.1.3 预定义的快捷方式


  • 例如 BasicTypeInfo 类定义了一系列常用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的类型声明,可以直接使用:
3.png


  • 当然,如果觉得 BasicTypeInfo 还是太长,Flink 还提供了完全等价的 Types 类(org.apache.flink.api.common.typeinfo.Types):
4.png


  • Types 为常见数据类型提供 TypeInformation,使用起来非常方便,如下示例:
  1. // 示例1 Types.TUPLE
  2. DataStream<Tuple2<String, Integer>> result1 = env.fromElements("a", "b", "a")
  3.         .map(value -> Tuple2.of(value, 1))
  4.         .returns(Types.TUPLE(Types.STRING, Types.INT));
  5. result1.print("R1");
  6. // 示例2 Types.POJO
  7. DataStream<WordCount> result2 = env.fromElements("a b a")
  8.         .flatMap((String value, Collector<WordCount> out) -> {
  9.             for(String word : value.split("\\s")) {
  10.                 out.collect(new WordCount(word, 1));
  11.             }
  12.         })
  13.         .returns(Types.POJO(WordCount.class));
  14. result2.print("R2");
复制代码
完整示例: https://github.com/sjf0115/data-example/blob/master/flink-example/src/main/java/com/flink/example/stream/base/typeInformation/hints/TypesExample.java
4.2 显式提供类型信息


  • 当 Flink 无法自动推断函数的生成类型是什么的时候,就需要我们显示提供类型信息提示。从上面示例中我们知道可以通过 returns 显示提供类型信息,除此之外还可以实现 ResultTypeQueryable 接口显示提供。


  • org.apache.flink.api.java.typeutils.ResultTypeQueryable
  1. //样例:
  2. public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
  3.         //...
  4.     @Override
  5.     public TypeInformation getProducedType() {
  6.         return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
  7.         //return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[错误示范] TypeHint<T> 的 T : 不支持泛型,将报错: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
  8.         //return TypeInformation.of( Tuple2.class );//[错误示范] 将报错: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
  9.         //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
  10.     }
  11. }
复制代码
4.2.1 returns

第一种方法是使用 returns 为算子添加返回类型的类型信息提示。对于非泛型类型,可以直接传入 Class 即可;对于泛型类型需要借助 TypeHint 提供类型信息提示,如下所示:
  1. // 示例1 非泛型类型 直接传入 Class
  2. DataStream<WordCount> result1 = env.fromElements("a b a")
  3.         .flatMap((String value, Collector<WordCount> out) -> {
  4.             for(String word : value.split("\\s")) {
  5.                 out.collect(new WordCount(word, 1));
  6.             }
  7.         })
  8.         .returns(WordCount.class);
  9. result1.print("R1");
  10. // 示例2 泛型类型 优先推荐借助 TypeHint
  11. DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
  12.         .map(value -> Tuple2.of(value, 1))
  13.         .returns(new TypeHint<Tuple2<String, Integer>>() {});
  14. result2.print("R2");
  15. // 示例3 TypeInformation.of + TypeHint
  16. DataStream<Tuple2<String, Integer>> result3 = env.fromElements("a", "b", "a")
  17.         .map(value -> Tuple2.of(value, 1))
  18.         .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  19. result3.print("R3");
  20. // 示例4 Types 快捷方式
  21. DataStream<Tuple2<String, Integer>> result4 = env.fromElements("a", "b", "a")
  22.         .map(value -> Tuple2.of(value, 1))
  23.         .returns(Types.TUPLE(Types.STRING, Types.INT));
  24. result4.print("R4");
复制代码
4.2.2 ResultTypeQueryable


  • 第二种方法是通过实现 ResultTypeQueryable 接口来扩展函数以显式提供返回类型的 TypeInformation。
如下示例是一个显式提供返回类型的 MapFunction:
  1. public static class ResultTypeMapFunction implements MapFunction<String, Stu>, ResultTypeQueryable {
  2.     @Override
  3.     public Stu map(String value) throws Exception {
  4.         String[] params = value.split(",");
  5.         String name = params[0];
  6.         int age = Integer.parseInt(params[1]);
  7.         return new Stu(name, age);
  8.     }
  9.     @Override
  10.     public TypeInformation getProducedType() {
  11.         return Types.POJO(Stu.class);
  12.     }
  13. }
复制代码
5 使用场景

5.1 Table 转 DataStream


  • Table 转 DataStream 的时候,Table 并清楚 DataStream 的数据结构。
因此,需要给当前转换出来的 DataStream 显性的指定数据类型:
  1. // 转化为 Pojo 类型
  2. DataStream<WordCount> stream1 = tEnv.toAppendStream(table, Types.POJO(WordCount.class));
  3. // 转换为 Row 类型
  4. DataStream<Row> stream2 = tEnv.toAppendStream(table, Types.ROW(Types.STRING, Types.LONG));
复制代码
5.2 Lambda 表达式与泛型


  • 由于 Java 泛型会出现类型擦除问题,因此 Flink 通过 Java 反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。
对于函数的返回类型取决于输入类型的情况时,会包含一些简单的类型推断。
但如果无法重构所有的泛型类型信息时,需要借助于类型提示来告诉系统函数中传入的参数类型信息和输出参数信息。
如下所示使用 returns 语句指定生成的类型:
  1. env.fromElements(1, 2, 3)
  2.   .map(i -> Tuple2.of(i, i*i))
  3.   // 如果不指定 returns 返回的 TypeInformation 会抛出异常
  4.   .returns(Types.TUPLE(Types.INT, Types.INT))
  5.   .print();
复制代码
Y 推荐文献


  • Apache Flink 进阶(五):数据类型和序列化 - Wechat/Apache Flink 2019.10.22
  • Flink 类型和序列化机制简介 - 腾讯云
X 参考文献


  • Flink DataStream 类型系统 TypeInformation - 腾讯云 2022.4.23 【推荐】
  • flink 遇到 Tuple2 泛型导致的问题:could not be determined automatically, due to type erasure. - jianshu
遇到【泛型】的类型擦除问题
    本文作者:        千千寰宇   
    本文链接:         https://www.cnblogs.com/johnnyzen   
    关于博文:评论和私信会在第一时间回复,或直接私信我。   
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA     许可协议。转载请注明出处!
    日常交流:大数据与软件开发-QQ交流群: 774386015        【入群二维码】参见左下角。您的支持、鼓励是博主技术写作的重要动力!   

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册