Welcome toTencent Cloud + community, get more Tencent mass technology practice dry goods oh ~

This post was posted on cloud + community by kyledong

When writing processing logic in Flink, beginners are often confused by the myriad concepts:

Why does Flink have so many type declarations?

Basictypeinfo.string_type_info, types.string, types.string () what is the difference?

What is TypeInfoFactory?

How are TypeInformation. Of and TypeHint used?

Next, this article will decipher Flink’s type and serialization mechanism step by step.

Flink’s classification of types

Flink type system source at org.apache.flink.api.com. Mon typeinfo package, let us the deep trace in figure 1, look at the class hierarchy of figure:

As you can see, there is a one-to-one correspondence between figure 1 and figure 2. The TypeInformation class is a common base class that describes all types. It and all its subclasses must be Serializable because the TypeInformation will be passed to each execution node with Flink’s job submission.

Because Flink manages its own memory and uses a very compact storage format (see the official blog post), type information is a crucial metadata in the overall data processing process.

TypeExtractror Type extraction

Flink internal implementation named TypeExtractror class, can use method signature, subclass information and other clues, automatically extract and recover type information (of course, can also be explicitly declared, that is, the content introduced in this article).

However, due to Java type erasure, automatic extraction does not always work. Therefore, some cases (such as classes loaded dynamically through URLClassLoader) still need to be handled manually; For example, in the following figure, the.returns() method is used to declare the return type of the DataSet.

It’s important to note that returns() accepts three types of arguments: Class names described by strings (e.g., “String”), TypeHints (for generic type arguments, as we’ll see below), Java native classes (e.g., String.class), and so on. However, the use of string forms is going to be obsolete, so use methods like class.forname () if necessary.

Below is the ExecutionEnvironment class’s registerType method, which registers the subclass information with Flink (Flink knows the parent class, but doesn’t necessarily know some unique features of the subclass, so it needs to be registered). Here is an example of the flink-ML machine learning library code:

From the image below you can see, if by TypeExtractor createTypeInfo (type) method to the type of information belongs to PojoTypeInfo and its subclasses, so will its registered together; Otherwise, it will be handled by Kryo and Flink will not care (in this case, the performance will deteriorate).

Common means of declaring type information

With the typeinformation.of () method, you can simply create TypeInformation objects.

1. For non-generic classes, just pass in the Class object

2. For generic classes, use TypeHint to store generic type information

TypeHint works by creating anonymous subclasses, and runtime TypeExtractor can get the actual type saved via the getGenericSuperclass().getActualTypeArguments () method.

3. Predefined shortcuts

BasicTypeInfo, for example, defines a set of shortcuts for common types that can be used directly for type declarations of basic types such as String, Boolean, Byte, Short, Integer, Long, Float, Double, Char, and so on.

For example, the following is the type declaration for each field of the Row type, which is very concise and does not require new XxxTypeInfo<>(many, many arguments)

Of course, if think BasicTypeInfo or too long, Flink also provides completely equivalent Types of class (org.apache.flink.api.com mon. Typeinfo. Types) :

Especially is important to note that the flink – table module has a class Types (org. Apache. Flink. Table. API. Types), used for table module within the type definition information, usage is slightly different. Be careful when using IDE automatic imports:

4. Customize TypeInfo and TypeInfoFactory

Custom TypeInfo provides Flink native memory management (instead of Kryo) for any class, making storage more compact and runtime more efficient.

The developer uses the @TypeInfo annotation on the custom class, then creates the corresponding TypeInfoFactory and overrides the createTypeInfo method.

Note that you need to inherit the TypeInformation class, define a type for each field, and override metadata methods such as whether they are basic (isBasicType), Tuple (isTupleType), tuples (for one-dimensional Row types, equal to the number of fields), and so on. Thus provide decision basis for TypeExtractor.

More examples, please refer to the Flink source org/apache/Flink/API/Java/typeutils/TypeInfoFactoryTest Java

TypeSerializer

Flink comes with a number of TypeSerializer subclasses. In most cases, the various custom types are permutations of common types and can be reused directly:

If not, you can inherit TypeSerializer and its subclasses to implement your own serializer.

Kryo serialization

Types that Flink cannot serialize (such as user-defined types without registerType or custom TypeInfo and TypeInfoFactory) are handed over to Kryo by default.

If Kryo is still unable to handle (for example, some classes from third-party libraries such as Guava, Thrift, Protobuf, etc.), there are two solutions:

\1. Avro can be forced to replace Kryo:

env.getConfig().enableForceAvro(); // env represents the ExecutionEnvironment object, the same as belowCopy the code

\2. Add customized Serializer for Kryo to enhance the function of Kryo:

env.getConfig().addDefaultKryoSerializer(Class<? >type, Class<? extends Serializer<? >> serializerClassCopy the code

As well as

env.getConfig().registerTypeWithKryoSerializer(Class<? >type, T serializer)
Copy the code

If you want to disable Kryo completely (100% using Flink’s serialization mechanism), you can use the following Settings, but note that any unhandled classes will cause an exception:

env.getConfig().disableGenericTypes();
Copy the code

Pitfalls and pitfalls of the type mechanism

No man is perfect. Flink’s built-in type system, while powerful and flexible, still has a few caveats:

1. Type extraction of Lambda functions

Since Flink type extraction relies on inheritance and other mechanisms, lambda functions are special because they are anonymous and have no related classes, so their type information is difficult to obtain.

Whereas the Eclipse JDT compiler writes information such as the generic signature of lambda functions into the compiled bytecode, other common compilers, such as Javac, do not, and Flink cannot obtain specific type information.

2. JavaSerializer for Kryo has a Bug in Flink

It is recommended to use org. Apache. Flink. API. Java. Typeutils. Runtime. Kryo. JavaSerializer rather than Com. Esotericsoftware. Kryo. Serializers. JavaSerializer to prevent is not compatible with Flink.

Type mechanism and memory management

Using StringSerializer as an example, here’s how Flink manages memory:

Here is the serialization process:

As you can see, Flink’s memory management is very detailed and hierarchical, and the code is easy to understand.

Refer to the reading

Data Types & Serialization

Flink principle and implementation: memory management

Flink data types and serialization

Q&a How to use Flink Quickstart without dependencies in Eclipse IDE? Apache Calcite function brief Analysis and application in Flink machine learning actual combat! Quick introduction to online advertising business and CTR knowledge

This article has been authorized by the author to Tencent Cloud + community, more original text pleaseClick on the

Search concern public number “cloud plus community”, the first time to obtain technical dry goods, after concern reply 1024 send you a technical course gift package!

Massive technical practice experience, all in the cloud plus community!