Flink1.12 implemented a new set of kafkaSource based on flip-27. Compared with the original KafkaConsumer, it provides a JobMaster communication scheme for Task, which is convenient for us to carry out more customized development. However, the corresponding documents on the official website have not been updated, so there will be some difficulties in getting started. I will make a simple analysis of this piece of source code in this article, hoping to provide help for you.

Background knowledge

  • Flip-27
  • The official website is based on Flip-27’s KafkaSource

The preparatory work

  • Download the Flink-1.12 Release Package on Github

KafkaSource principle analysis

The major functional differences between the new kafkaSource and the original KafkaConsumer are mainly in two aspects. First, the combination of batch streams is realized. Batch or stream processing can be realized by providing those to those who are Bounded or unbounded. The other is that RPC traffic can be carried out through the newly added Enumerator running on JobMaster and the newly added Reader running on Task. I will mainly analyze the code combined with these two points.

Bounded and unbounded

Before Flink1.12, Kafka had no Bounded Source. If you wanted to process a certain topic in a specified period of time, you had to read a Kafka data through the DataSet Source, manually control the reading and quit. Moreover, the efficiency of DataSet scheme is very low, which is only suitable for mass processing and consumes resources.

Flink1.12’s new KafkaSource makes it easy to implement this requirement. Check out this module.

org.apache.flink.connector.kafka.source

KafkaSource instantiation calls two classes, KafkaSource and KafkaSourceBuilder, which are described briefly in the comments.

As you can see, the Properties of KafkaConsumer are not fully commented. Let’s continue to look at the builder method

You can find the class KafkaSourceBuilder used for source instantiation. Let’s open KafkaSourceBuilder and see the methods inside.

It can be seen that KafkaSourceBuilder sets bounded and unbounded by two set methods. Looking at the notes of these two methods, we can see getBoundedness

	 * <p>This method is different from {@link #setUnbounded(OffsetsInitializer)} that after
	 * setting the stopping offsets with this method, {@link KafkaSource#getBoundedness()}
	 * will return {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}.
Copy the code
	 * <p>This method is different from {@link #setBounded(OffsetsInitializer)} that after
	 * setting the stopping offsets with this method, {@link KafkaSource#getBoundedness()}
	 * will still return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at
	 * the stopping offsets specified by the stopping offsets {@link OffsetsInitializer}.
Copy the code

This getBoundedness in KafkaSource is actually the basis for Operator to judge whether the Source is bounded or unbounded during operation, at startup

This determines which module is going to execute, example an application, open the call stack.

As you can see, bounded unbounded is determined when the StreamGraph is initially generated, so how is it turned off when the bounded data source is finished reading? This section covers the communication mechanism between Reader and Enumerator in KafkaSource. We will not go into further details in this section, but will briefly explain bounded and unbounded implementations.

Enumerator analysis

We can see when setBounded introduced into a stoppingOffsetsInitializer, layer upon layer orientation can be found after he was introduced to the createEnumerator this method, this is the instance KafkaSourceEnumerator method, I marked it on the diagram

Our stoppingOffset and the corresponding stop location were stored together with the start location in the partitionSplits which were returned to us through a method.

This method is called by the start() method with the asynchronous context.callAsync() method, The back is actually a through handlePartitionSplitChanges method was introduced into discoverAndInitializePartitionSplit asynchronous calls

Behind the scenes

So, in fact is the method accepted discoverAndInitializePartitionSplit handlePartitionSplitChanges this method returns the result of partitionSplits, let’s take a look at this method do

The first is addPartitionSplitChangeToPendingAssignment

This method stores source parallelism in numReaders and ownReader num in hashed groups for each topicPartion

Then put the incoming partitionSplit groups exist pendingPartitionSplitAssignment.

Then the assignPendingPartitionSplits method, this method is based on pendingPartitionSplitAssignment again do some actions:

The key to this method is the context.assignmonium () method, which sends the group information to the Reader.

You use the SendEvent method for CoordinatorContext, which sends data from Enumerator to the Reader in the Task to tell the Reader what data to read, At this point we can understand how KafkaSource is both bounded and unbounded.

conclusion

This article is a simple introduction to how KafkaSource works. It’s easy to get started with, but I’ll prepare more difficult ones

  • How do I customize communication between Readers and Enumerators
  • The pollNext() method of KafkaSource is modified for custom processing
  • How does SourceOperator start KafkaSource in the runtime environment

I will analyze the above three problems after sorting them out.