- 不仅仅是流计算:Apache Flink实践
- InfoQ中文站
- 1959字
- 2020-06-26 06:08:27
Apache Flink类型和序列化机制简介
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0144_0001.jpg?sign=1739700592-Kvfj5HFqfydNORn7hjVEIf5Yx8HYJrfh-0-8bf5677dab6dfc3f9ce4cdcc88c00003)
使用Apache Flink(以下简称Flink)编写处理逻辑时,新手总是容易被林林总总的概念所混淆:
为什么Flink有那么多的类型声明方式?
BasicTypeInfo.STRING TYPE INFO、Types.STRING 、Types.STRING()有何区别?
TypeInfoFactory又是什么?
TypeInformation.of和TypeHint是如何使用的呢?
接下来本文将逐步解密Flink的类型和序列化机制。
Flink的类型分类
Flink的类型系统源码位于org.apache.flink.api.common.typeinfo包,让我们对图1深入追踪,看一下类的继承关系图:
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0001.jpg?sign=1739700592-BmNWHHMF4xMBLEfuxQF3edXLVTlZlho0-0-31413fad60668eb65aeb8dd5eacdc3cc)
图1:Flink类型分类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0002.jpg?sign=1739700592-MAK5FXtHcFeEQRFtk802stRfI2Xew8ui-0-b923f52b2cd1c50b3d754539319bf780)
图2:TypeInformation类继承关系图
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0146_0001.jpg?sign=1739700592-GTj0BWDKpmt3t8P1SGfBqaWIeWCexpiw-0-aa74fcb2c42d9d91014d4fa95c35f65b)
图3:使用.returns方法声明返回类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0001.jpg?sign=1739700592-xLS0qINuoDh2EQIsGctcWlYSC1ANJTaD-0-9eec3af97270a9441c5443f9b2614264)
图4:Flink-ML注册子类类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0002.jpg?sign=1739700592-EnBcM7lM4otjdq1Ksl5TnXHJYktNrpF1-0-5f830c256305d29061dc414797bc62bd)
图5:Flink允许注册自定义类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0001.jpg?sign=1739700592-v4gWwhdtDmd1JVuZ0M1ZBEvCVuJCS8di-0-c8d526b8e252b98fb3810105a9809caa)
图6:class对象作为参数
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0002.jpg?sign=1739700592-NojeMAVshJhW6L0SFrorVkgthjf6kxdE-0-0a98a5e304f135bc85453d3b03336ce6)
图7:TypeHint作为参数,保存泛型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0003.jpg?sign=1739700592-FkvQBQhrBX3dFJQqgSmMr25ZxRiEWER3-0-5ca6b33a7e5b3179de1874f0c404ebf3)
图8:BasicTypeInfo快捷方式
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0001.jpg?sign=1739700592-oCkPWUfyfyJI2GJxymsG3YBwERjeACdG-0-9828da47c6dea8962b0a588a6c9ec350)
图9:使用BasicTypeInfo快捷方式来声明一行(Row)每个字段的类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0002.jpg?sign=1739700592-z6lRvOSQB4tRAPMEx3MVNqAS3rt7CcsI-0-fbd3ee6bc5fa96b24d7ae525ca12eebd)
图10:Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0003.jpg?sign=1739700592-MCBuTI6D9LhLZq9X0wD2mmXU5VyFxz8j-0-bdca1bfbbe697fa4713307b4f71149fc)
图11:flink-table模块的Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0150_0001.jpg?sign=1739700592-yQuU878nVAITCxBri6MaLSXhShPGK3w2-0-1342e196a026b3c08f7802c6f52918cd)
图12:为自定义类提供类型支持(图片未展示全部字段)
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0151_0001.jpg?sign=1739700592-g8t2HQqJ3yf5x6fb9KWebE4s8i4egu08-0-4809c6862b78a65fd09c131b5666876b)
图13:Flink自带的TypeSerializer子类概览
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0001.jpg?sign=1739700592-DEcOB2iCCAKZtNJ5sFLXSAG9gWaYJ8qM-0-3b31a0a4e2abbeeae0ce3f9a83a86a4c)
图14:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0002.jpg?sign=1739700592-oAu6VFiZjAk00RgS8Bruqoz78zqVfZRx-0-bbb2252331f1a25fdf075e2b4fd18a0e)
图15:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0001.jpg?sign=1739700592-rHX8IACmd7ZjJD1tjBLQlzuRIM8RQAzW-0-172e656a049bd3fa46e79f7980fdb66c)
图16:类型信息到内存块
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0002.jpg?sign=1739700592-KfK6pk8vH7ApQOKF0p9cC9cPOTUKgR4w-0-966be5ae1e4b83f73437f51067506836)
图17:StringSerializer类的serialize()方法
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0154_0001.jpg?sign=1739700592-tMUwfhtAb0scwGG29E2IJj9K3hiw3mkE-0-a805771745ba9a07f34fa09c1c0462ea)
图18:String对象的序列化过程
可以看到,图1和图2是一一对应的,TypeInformation类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随Flink的作业提交,被传递给每个执行节点。
由于Flink自己管理内存,采用了一种非常紧凑的存储格式(见官方博文),因而类型信息在整个数据处理流程中属于至关重要的元数据。
TypeExtractror类型提取
Flink内部实现了名为TypeExtractror的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(当然也可以显式声明,即本文所介绍的内容)。
然而由于Java的类型擦除,自动提取并不是总是有效。因而一些情况下(例如通过URLClassLoader动态加载的类),仍需手动处理;例如下图中对DataSet变换时,使用.returns()方法声明返回类型。
这里需要说明一下,returns()接受三种类型的参数:字符串描述的类名(例如"String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java原生Class(例如String.class)等;不过字符串形式的用法即将废弃,如果确实有必要,请使用Class.forName()等方法来解决。
下面是ExecutionEnvironment类的registerType方法,它可以向Flink注册子类信息(Flink认识父类,但不一定认识子类的一些独特特性,因而需要注册),下面是Flink-ML机器学习库代码的例子:
从下图可以看到,如果通过TypeExtractor.createTypeInfo(type)方法获取到的类型信息属于PojoTypeInfo及其子类,那么将其注册到一起;否则统一交给Kryo去处理,Flink并不过问(这种情况下性能会变差)。
声明类型信息的常见手段
通过TypeInformation.of()方法,可以简单地创建类型信息对象。
1.对于非泛型的类,直接传入Class对象即可
2.对于泛型类,需要借助TypeHint来保存泛型类型信息
TypeHint的原理是创建匿名子类,运行时TypeExtractor可以通过getGenericSuperclass(). getActualTypeArguments()方法获取保存的实际类型。
3.预定义的快捷方式
例如BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于String、Boolean、Byte、Short、Integer、Long、Float、Double、Char等基本类型的类型声明,可以直接使用。
例如下面是对Row类型各字段的类型声明,使用方法非常简明,不再需要new XxxTypeInfo<>(很多很多参数)
当然,如果觉得BasicTypeInfo还是太长,Flink还提供了完全等价的Types类(org.apache.flink.api.common.typeinfo.Types):
特别需要注意的是,flink-table模块也有一个Types类(org.apache.flink.table.api.Types),用于table模块内部的类型定义信息,用法稍有不同。使用IDE的自动import时一定要小心:
4.自定义TypeInfo和TypeInfoFactory
通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo方法。
注意需要继承TypeInformation类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是Tuple(isTupleType)、元数(对于一维的Row类型,等于字段的个数)等等,从而为TypeExtractor提供决策依据。
更多示例,请参考Flink源码的org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
TypeSerializer
Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用:
如果不能满足,那么可以继承TypeSerializer及其子类以实现自己的序列化器。
Kryo序列化
对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfo和TypeInfoFactory),默认会交给Kryo处理。
如果Kryo仍然无法处理(例如Guava、Thrift、Protobuf等第三方库的一些类),有以下两种解决方案:
1.可以强制使用Avro来替代Kryo:
env.getConfig().enableForceAvro(); // env代表ExecutionEnvironment对象,下同
2.为Kryo增加自定义的Serializer以增强Kryo的功能:
env.getConfig().addDefaultKryoSerializer(Class<? > type, Class<? extends Serializer<? >> serializerClass
以及
env.getConfig().registerTypeWithKryoSerializer(Class<? > type, T serializer)
如果希望完全禁用Kryo(100% 使用Flink的序列化机制),则可以使用以下设置,但注意一切无法处理的类都将导致异常:
env.getConfig().disableGenericTypes();
类型机制的陷阱与缺陷
金无足赤,人无完人。Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点:
1.Lambda函数的类型提取
由于Flink类型提取依赖于继承等机制,而lambda函数比较特殊,它是匿名的,也没有与之相关的类,所以其类型信息较难获取。
Eclipse的JDT编译器会把lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。
2.Kryo的JavaSerializer在Flink下存在Bug
推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非
com.esotericsoftware.kryo.serializers.JavaSerializer以防止与Flink不兼容。
类型机制与内存管理
下面以StringSerializer为例,来看下Flink是如何紧凑管理内存的:
下面是具体的序列化过程:
可以看到,Flink对于内存管理是非常细致的,层次分明,代码也容易理解。