Sorted by: MAO He

This article is compiled according to the Live broadcast of Apache Flink series and shared by Apache Flink Contributor, Senior engineer of 360 data development, Mr. Ma Qingxiang. This article is mainly shared from four parts: how to customize the serialization framework for Flink, the best practice of Flink serialization, the serialization of Flink communication layer and the question and answer session.

Serialization framework tailored for Flink

Why tailor a serialization framework for Flink?

Most of the technology components run on the JVM, Flink also runs on the JVM, and jVM-based data analysis engines need to store a large amount of data in memory, which has some problems with the JVM, such as the low density of Java object storage. The most common approach to these problems is to implement an explicit memory management, which means that a custom memory pool is used to allocate and recycle memory, and then serialized objects are stored in memory blocks.

There are already many serialization frameworks in the Java ecosystem, such as Java Serialization, Kryo, Apache Avro, and so on. But Flink still chose its own custom serialization framework, so what’s the point? If Flink chooses its own customized serialization framework, the more it knows about the type information, the better it can complete the type check in the early stage, select a better serialization method, carry out data layout, save data storage space, and directly operate binary data.

Flink data type





Flink has built its own type system internally. The type classification supported by Flink at the present stage is shown in the figure. As you can see from the figure, Flink types can be divided into Basic, Arrays, Composite, Auxiliary, Generic, and other types. Flink supports any Java or Scala type. Don’t need like Hadoop to implement a specific interface (. Org. Apache Hadoop. IO. Writable), Flink can automatically identify data types.





How are so many data types represented inside Flink? In the Person class shown here, a Pojo of the composite type is represented in Flink as PojoTypeInfo, which inherits from TypeInformation, That is, In Flink, TypeInformation is used as a type descriptor to represent each data type to be represented.

TypeInformation





The mind map of TypeInformation is shown in the figure. It can be seen from the figure that each specific type in Flink corresponds to a specific TypeInformation implementation class. For example the IntegerTypeInformation BasicTypeInformation and FractionalTypeInformation concrete corresponds to a TypeInformation. And then there are BasicArrayTypeInformation, CompositeType and some other types, are corresponding to a specific TypeInformation.

TypeInformation is the core class of the Flink type system. For user-defined functions, Flink requires a type information to be the input and output type of the Function, namely TypeInfomation. The type information class is used as a tool to generate TypeSerializer for the corresponding type and to perform semantic checks, such as checking if a field exists in a type when it is the key of Joing or grouping.

How do I use TypeInformation? This will be explained in the following practice.

Flink’s serialization process





In the Flink serialization process, a serializer is required for the serialization operation, so where does the serializer come from? Each concrete data type corresponds to a concrete implementation of TypeInformation, and each TypeInformation provides its own serializer for the corresponding concrete data type. As you can see from Flink’s serialization process diagram, TypeInformation provides a createSerialize() method that gets TypeSerializer for that type’s data serialization and de-ordering objects.

Flink can automatically generate the corresponding serializer for most data types, which is very efficient for serializing and deserializing data sets, such as BasicTypeInfo, WritableTypeIno, etc., but for GenericTypeInfo, Flink can automatically generate the corresponding serializer for most data types, which is very efficient for serializing and deserializing data sets. Flink will use Kyro for serialization and deserialization. The Tuple, Pojo, and CaseClass types are compound types that may nest one or more data types. In this case, their serializers are also composite. They delegate the serialization of the embedded type to the serializer of the corresponding type.

A brief introduction to the type rules of Pojo, that is, in the case of meeting some conditions, the Pojo serialization will be selected for the corresponding serialization and deserialization of an operation. That is, the class must be Public and have a Public no-argument constructor, All non-static no-static, non-transient no-transient fields in this class (and all superclasses) are public (and non-final) or have public getters and setters, This method follows the Java bean naming conventions of getters and setters. When a user-defined data type cannot be recognized as a POJO type, it must be treated as GenericType and serialized using Kryo.

Flink comes with a number of TypeSerializer subclasses. In most cases, the custom types are combinations of common types, so you can reuse them directly. If the built-in data types and serialization methods don’t meet your needs, Flink’s type information system also supports extension. If you have special requirements, you can implement TypeInformation, TypeSerializer, and TypeComparator to customize the serialization and size comparison methods of your own types to improve the performance of data types in serialization and comparison.





Serialization is the process of converting a data structure or object into a binary string, which in Java is simply understood as a byte array. Deserialization, on the other hand, is the process of converting binary strings generated during serialization into data structures or objects. Take the nested Tuple 3 object as an example to briefly describe its serialization. Tuple 3 contains three levels: an int, a double, and a Person. Person contains two fields, an ID of type int and a name of type String, which will delegate the corresponding serialization operation to the corresponding serializer. Tuple 3 serializes an int through IntSerializer, which requires only four bytes. Since int takes up four bytes, this can reflect one of the advantages of Flink’s serializable process, that is, it can better serialize and deserialize the corresponding operation under the premise of knowing the data type. On the contrary, Java serialization can store more property information, but the storage space occupied at one time will suffer a certain loss.

The Person class is treated as a Pojo object, and the PojoSerializer serializer stores property information in a single byte. Similarly, the corresponding serializer is used to serialize the corresponding fields. In the result of serialization, we can see that all data is supported by MemorySegment. What does MemorySegment do?

MemorySegment in Flink serializes objects to pre-allocated memory blocks that represent one fixed-length memory block with a default size of 32 KB. A MemorySegment represents one of the smallest memory allocation units in Flink, which is equivalent to a Java byte array. Each record is stored in one or more memorySegments in serialized form.

Best practices for Flink serialization

The most common scenario

There are four common application scenarios of Flink, namely, registering subtypes, registering custom serializers, adding type hints, and manually creating TypeInformation, which are described as follows:

  • Register subtypes: If function signatures only describe supertypes, but they actually use subtypes of the supertype during execution, making Flink aware of these subtypes can greatly improve performance. Can call in or StreamExecutionEnvironment ExecutionEnvironment. Registertype subtypes (clazz) registration information.
  • Register custom serialization: Flink uses Kryo to serialize data types that do not fit into its serialization framework. Not all types work seamlessly with Kryo, as described below.
  • Add type hints: sometimes, when Flink fails to deduce generic information by all means, the user needs to pass in a TypeHint, which is usually only required in the Java API.
  • Manually create a TypeInformation: This may be necessary in some API calls because Java’s generic type erasure prevents Flink from infering data types.
In most cases, you don’t have to worry about serialization frameworks and registration types, because Flink already provides a large number of serialization operations. You don’t need to define your own serializers, but in some special cases, you need to do something about them.

Practice – Type declarations

In what way is a type declaration used to create an object with type information? An object of TypeInformation is typically created using the typeinformation.of () method, as follows:

  • For non-generic classes, just pass in the class object. PojoTypeInfo typeInfo = (PojoTypeInfo ) TypeInformation.of(Person.class);
  • For generic classes, you need to store generic type information via TypeHint. final TypeInfomation

    > resultType = TypeInformation.of(new TypeHint

    >(){});


  • Predefined constants.
BasicTypeInfo, for example, defines a set of shortcuts for common types, and can be used directly for type declarations of basic types such as String, Boolean, Byte, Short, Integer, Long, Float, Double, Char, and so on. And Flink also provides completely equivalent Types of classes (org.apache.flink.api.com mon. Typeinfo. Types). Especially is important to note that the flink – table module has a class Types (org. Apache. Flink. Table. API. Types), used for table module within the type definition information, usage is slightly different. Be careful when using IDE automatic imports.

  • Customize TypeInfo and TypeInfoFactory.




Custom TypeInfo provides Flink native memory management (instead of Kryo) for any class, making storage more compact and runtime more efficient. Note that you use the @TypeInfo annotation on your custom class, then create the corresponding TypeInfoFactory and override the createTypeInfo() method.

Practice – Register subtypes

Flink knows the parent class, but not necessarily some of the unique features of the subclass, so it needs to register the subtype separately.

StreamExecutionEnvironment registerType ExecutionEnvironment provided to the Flink () method is used to register a subclass information.

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Env. registerType(typeClass);Copy the code




Inside the registerType() method, TypeExtractor is used to extract type information, as shown in the figure above. The type information obtained belongs to PojoTypeInfo and its subclasses, so it needs to be registered together, otherwise it will be handed over to Kryo. Flink doesn’t care (performance deteriorates in this case).

Practice – Kryo serialization

Types that Flink cannot serialize (such as user-defined types that do not have registerTypes or custom TypeInfo and TypeInfoFactory) are handed over to Kryo by default, If Kryo is still unable to handle (for example, some classes from third-party libraries such as Guava, Thrift, Protobuf, etc.), there are two solutions:

  • Force use of Avro instead of Kryo. env.getConfig().enableForceAvro();
  • Add custom Serializer for Kryo to enhance the functionality of Kryo. env.getConfig().addDefaultKryoSerializer(clazz, serializer);
Note: If you want to disable Kryo completely (100% using Flink’s serialization mechanism), you can do this by using kryo-env.getConfig ().disableGenericTypes(), but note that any unhandled classes will cause an exception, which is very useful for debugging.

Serialization of Flink communication layer

If Flink tasks need to transfer data records across the network, the data needs to be serialized and written to NetworkBufferPool, and then the lower-level tasks read and deserialized, and finally the logical processing.

In order for records and events to be written to Buffer and then read out of Buffer when consumed, Flink provides RecordSerializer, RecordDeserializer and EventSerializer.

The data sent by Function is encapsulated as SerializationDelegate, which exposes any element as IOReadableWritable for serialization, passing in the data to be serialized through setInstance().

In the serialization of Flink communication layer, there are several issues worth paying attention to as follows:

  • When are the input and output types of Function determined?




The input and output types of Function are determined using the TypeExtractor tool when building StreamTransformation. TypeExtractor class can automatically extract or recover type information according to method signature, subclass information and other clues.

  • When is the Function serializer/deserializer determined?
To construct a StreamGraph, use TypeInfomation createSerializer() to obtain TypeSerializer for the corresponding type of serializer. And perform setSerializers() in addOperator(), Set the TYPE_SERIALIZER_IN_1, TYPE_SERIALIZER_IN_2, TYPE_SERIALIZER_OUT_1 properties of StreamConfig.

  • When does the actual serialization/deserialization take place? How does this process relate to TypeSerializer?




You should be familiar with Tsk and StreamTask. Tasks are managed and scheduled directly by TaskManager, and tasks call StreamTask, which encapsulates the operator’s processing logic. In the run() method, the deserialized data is first encapsulated as StreamRecord and handed to the operator for processing; The result is then sent downstream through the Collector (SerializtionDelegate was identified when the Collector was built) and the serialized result is written to the DataOutput through the RecordWriter writer writer. The serialization is handled by SerializerDelegate, which is actually done through the serialize() method of TypeSerializer.



The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.