preface

I wrote an article on how to use Kafka producers elegantly from source code. There are producers and consumers.

Those who are new to Kakfa are advised to check it out first.

In my experience, most of these are consumers who are downstream of the data. I’ve also used Kafka to consume over 100 million messages a day (I have to hand it to Kakfa’s design). This article will use my experience with Kakfa to talk about how to consume data efficiently.

Single thread consumption

Take the previous code in the producer, for example, with a Topic:data-push, three partitions prepared in advance.

A total of 100 messages are sent to the three partitions first. There is no custom routing policy, so messages are evenly sent to the three partitions.

Let’s start with the simplest single-threaded consumption, as shown below:

Since the data is hashed in three different partitions, a single thread needs to traverse all three partitions to pull the data down.

Sample code for single-thread consumption:

This code can also be found on the website: the process of fetching data into a memory buffer and writing it to the database.

We will not discuss the method of submitting offset.

It can be seen from the consumption log that:

The fetched 100 pieces of data are literally traversing three partitions.

Single thread consumption is simple, but it has the following problems:

  • Inefficient. If there are dozens or hundreds of partitions, a single thread cannot fetch data efficiently.
  • Availability is low. If the consuming thread blocks, or even if the process hangs, the entire consuming application will have problems.

Multithreaded consumption

Since there are so many problems with single threads, can we use multiple threads to improve efficiency?

Before multithreading, we had to divide consumption patterns into two types: consumer groups and independent consumers.

These two consumption patterns correspond to very different processing methods, so it is necessary to talk about separately.

Independent consumer model

Let’s start with the independent consumer model, which is relatively niche compared to the consumer group.

Look at a simple example to see how it can be used:

It is worth noting that the individual consumer can leave the group.id attribute alone.

Also send 100 messages, consumption results are as follows:

As you can see from the API, you can manually specify which partitions to consume.

For example, data-push Topic has three partitions, and I can manually consume only 1 or 2 of them, and the third one can be consumed as needed.

It also supports multithreading, with each thread consuming a specified partition.

For intuition, only 10 pieces of data were sent.

According to the consumption results, it can be seen that:

C1 thread only takes 0 partition; C2 only takes 1 partition; C3 takes only data from partition 2.

We can even deploy consumers in multiple processes as follows:

Given that Topic:data-push has four partitions, we can create two processes as shown in the figure.

There are two threads per process, and each thread consumes the corresponding partition.

In this way, when our performance is not enough for the number of new Topic partitions, the consumer side only needs such horizontal expansion, which is very flexible.

This custom partition consumption approach is also useful in situations where producers send a certain type of data to only one partition at a time. This way we can only consume for this one partition.

There is a problem with this approach, however: availability is not high, when one of the processes dies; The partition data that this process is responsible for cannot be transferred to another process.

Consumer group model

The consumption group mode should be the most used consumption mode.

We can create N consumer instances (new KafkaConsumer()). When these instances are created with the same group.id, they belong to the same consumer group.

Consumer instances in the same consumer group can receive messages, but messages from a partition are sent to only one consumer instance.

To better understand it, use the official sample diagram.

Group P0, P1, P2, p3 create two consumer groups, groupA, groupB

  • There are two consumption instances in the consumption group AC1 and C2.
  • There are four consumption instances in consumer group BC3, C4, C5, C6.

How is the message split into each consumption instance?

It can be seen from the figure that:

  • C1 in group A consumes partitions P0 and P3; C2 consumes P1 and P2 partitions.
  • Group B has four instances, so each instance consumes one partition; That is, there is a one-to-one correspondence between consumption instances and partitions.

Note that:

The consumption instance here can simply be understood as new KafkaConsumer, which has nothing to do with the process.


Let’s say a Topic has three partitions, but I start two processes to consume it.

Each process has two instances of consumption, which is equivalent to four instances.

How can four instances consume three partitions?

The consumer group is self-balanced

Kafka has done this for me. It makes Rebalance in the consumer group.

For example, in the case above, there are 4 consumption instances for 3 partitions; In the end, only three instances will get the message. But as for which three, Kakfa will automatically assign them for us.

For example, in the data-push Topic before, there were three partitions.

When one of the processes (with three threads, one consumption instance for each thread) consumes as follows:

The 20 pieces of data are consumed by three instances of the process.

Now I’m starting a new process, which is exactly the same as the one above; This is equivalent to having two processes and six instances at once.

I send 10 more messages and find:

Process 1 fetched only two pieces of data from partition 1 (previously all data was fetched by the thread in process 1).


At the same time, process 2 consumes the remaining eight messages, respectively for partition 0 and partition 2 (there are still only three instances fetching data, but they are in different processes).


When I shut down process 2 and send 10 more pieces of data, I find that all the data has been consumed by three threads in process 1.

Through these tests, I believe you can already see the advantages of the consumer group.

We can create multiple consumption instances in a single consumption group to achieve high availability and high fault tolerance, without single threads or data being consumed when an independent consumer dies. At the same time, the way based on multi-threading also greatly improves the consumption efficiency.

When a new consumption instance or consumption instance fails, Kakfa reassigns the relationship between the consumption instance and the partition. This is called consumer group Rebalance.

The prerequisites for this to happen are as follows:

  • A new consumption instance is added to the consumption group.
  • Description The consumption instance in the consumption group went down.
  • The number of subscribed Topic partitions has changed.
  • In the case of regular subscription topics, changes in the number of matched topics can also resultRebalance.

Therefore, it is recommended to consume data in this way, and it also scales very well. If a new partition is created due to insufficient performance, you only need to start a new consumption instance and add it to the consumption group.

conclusion

This time only shared a few different ways of consumption data, and did not focus on consumption parameters, source code; You can share them next time if you’re interested.

Some of the source code mentioned in the article can be viewed here:

Github.com/crossoverJi…

Welcome to pay attention to the public account to communicate: