Welcome toTencent Cloud + community, get more Tencent mass technology practice dry goods oh ~
This post was posted on cloud + community by kyledong
When writing processing logic in Flink, beginners are often confused by the myriad concepts:
Why does Flink have so many type declarations?
Basictypeinfo.string_type_info, types.string, types.string () what is the difference?
What is TypeInfoFactory?
How are TypeInformation. Of and TypeHint used?
Next, this article will decipher Flink’s type and serialization mechanism step by step.
Flink’s classification of types
Flink type system source at org.apache.flink.api.com. Mon typeinfo package, let us the deep trace in figure 1, look at the class hierarchy of figure:
As you can see, there is a one-to-one correspondence between figure 1 and figure 2. The TypeInformation class is a common base class that describes all types. It and all its subclasses must be Serializable because the TypeInformation will be passed to each execution node with Flink’s job submission.
Because Flink manages its own memory and uses a very compact storage format (see the official blog post), type information is a crucial metadata in the overall data processing process.
TypeExtractror Type extraction
Flink internal implementation named TypeExtractror class, can use method signature, subclass information and other clues, automatically extract and recover type information (of course, can also be explicitly declared, that is, the content introduced in this article).
However, due to Java type erasure, automatic extraction does not always work. Therefore, some cases (such as classes loaded dynamically through URLClassLoader) still need to be handled manually; For example, in the following figure, the.returns() method is used to declare the return type of the DataSet.
It’s important to note that returns() accepts three types of arguments: Class names described by strings (e.g., “String”), TypeHints (for generic type arguments, as we’ll see below), Java native classes (e.g., String.class), and so on. However, the use of string forms is going to be obsolete, so use methods like class.forname () if necessary.
Below is the ExecutionEnvironment class’s registerType method, which registers the subclass information with Flink (Flink knows the parent class, but doesn’t necessarily know some unique features of the subclass, so it needs to be registered). Here is an example of the flink-ML machine learning library code:
From the image below you can see, if by TypeExtractor createTypeInfo (type) method to the type of information belongs to PojoTypeInfo and its subclasses, so will its registered together; Otherwise, it will be handled by Kryo and Flink will not care (in this case, the performance will deteriorate).
Common means of declaring type information
With the typeinformation.of () method, you can simply create TypeInformation objects.
1. For non-generic classes, just pass in the Class object
2. For generic classes, use TypeHint to store generic type information
TypeHint works by creating anonymous subclasses, and runtime TypeExtractor can get the actual type saved via the getGenericSuperclass().getActualTypeArguments () method.
3. Predefined shortcuts
BasicTypeInfo, for example, defines a set of shortcuts for common types that can be used directly for type declarations of basic types such as String, Boolean, Byte, Short, Integer, Long, Float, Double, Char, and so on.
For example, the following is the type declaration for each field of the Row type, which is very concise and does not require new XxxTypeInfo<>(many, many arguments)
Of course, if think BasicTypeInfo or too long, Flink also provides completely equivalent Types of class (org.apache.flink.api.com mon. Typeinfo. Types) :
Especially is important to note that the flink – table module has a class Types (org. Apache. Flink. Table. API. Types), used for table module within the type definition information, usage is slightly different. Be careful when using IDE automatic imports:
4. Customize TypeInfo and TypeInfoFactory
Custom TypeInfo provides Flink native memory management (instead of Kryo) for any class, making storage more compact and runtime more efficient.
The developer uses the @TypeInfo annotation on the custom class, then creates the corresponding TypeInfoFactory and overrides the createTypeInfo method.
Note that you need to inherit the TypeInformation class, define a type for each field, and override metadata methods such as whether they are basic (isBasicType), Tuple (isTupleType), tuples (for one-dimensional Row types, equal to the number of fields), and so on. Thus provide decision basis for TypeExtractor.
More examples, please refer to the Flink source org/apache/Flink/API/Java/typeutils/TypeInfoFactoryTest Java
TypeSerializer
Flink comes with a number of TypeSerializer subclasses. In most cases, the various custom types are permutations of common types and can be reused directly:
If not, you can inherit TypeSerializer and its subclasses to implement your own serializer.
Kryo serialization
Types that Flink cannot serialize (such as user-defined types without registerType or custom TypeInfo and TypeInfoFactory) are handed over to Kryo by default.
If Kryo is still unable to handle (for example, some classes from third-party libraries such as Guava, Thrift, Protobuf, etc.), there are two solutions:
\1. Avro can be forced to replace Kryo:
env.getConfig().enableForceAvro(); // env represents the ExecutionEnvironment object, the same as belowCopy the code
\2. Add customized Serializer for Kryo to enhance the function of Kryo:
env.getConfig().addDefaultKryoSerializer(Class<? >type, Class<? extends Serializer<? >> serializerClassCopy the code
As well as
env.getConfig().registerTypeWithKryoSerializer(Class<? >type, T serializer)
Copy the code
If you want to disable Kryo completely (100% using Flink’s serialization mechanism), you can use the following Settings, but note that any unhandled classes will cause an exception:
env.getConfig().disableGenericTypes();
Copy the code
Pitfalls and pitfalls of the type mechanism
No man is perfect. Flink’s built-in type system, while powerful and flexible, still has a few caveats:
1. Type extraction of Lambda functions
Since Flink type extraction relies on inheritance and other mechanisms, lambda functions are special because they are anonymous and have no related classes, so their type information is difficult to obtain.
Whereas the Eclipse JDT compiler writes information such as the generic signature of lambda functions into the compiled bytecode, other common compilers, such as Javac, do not, and Flink cannot obtain specific type information.
2. JavaSerializer for Kryo has a Bug in Flink
It is recommended to use org. Apache. Flink. API. Java. Typeutils. Runtime. Kryo. JavaSerializer rather than Com. Esotericsoftware. Kryo. Serializers. JavaSerializer to prevent is not compatible with Flink.
Type mechanism and memory management
Using StringSerializer as an example, here’s how Flink manages memory:
Here is the serialization process:
As you can see, Flink’s memory management is very detailed and hierarchical, and the code is easy to understand.
Refer to the reading
Data Types & Serialization
Flink principle and implementation: memory management
Flink data types and serialization
Q&a How to use Flink Quickstart without dependencies in Eclipse IDE? Apache Calcite function brief Analysis and application in Flink machine learning actual combat! Quick introduction to online advertising business and CTR knowledge
This article has been authorized by the author to Tencent Cloud + community, more original text pleaseClick on the
Search concern public number “cloud plus community”, the first time to obtain technical dry goods, after concern reply 1024 send you a technical course gift package!
Massive technical practice experience, all in the cloud plus community!