background

Today, I saw a post above pulse, which is more interesting:

With Kafka, we have multiple partitions. How can we increase consumption? How to use a thread pool to improve how to ensure that messages are not lost during restart.

This question actually asks two points, the first is how to increase consumption power, the second is if the thread pool, how to keep messages.

Here to explain what happened to these two questions, in a lot of message queue have a concept called partion, representing the partition, partition is the key to improve our message queue consumer, our consumer channels is from each partition, holds a partition can only be a consumer, as shown in the figure below:

Similar to bank queuing, the more the number of queue, queue time is relatively less, of course, can also go through the way of asynchronous processing, such as thread pool, throw all the message to the thread pool to perform, this leads to the author said that the second question, first of all, let’s look at the synchronous message consumption why not throw?

If we are using the synchronous model, when we consume, we ack the offset back. If we have a restart and the offset is not successful, then this part of the data will be consumed again. If we consume with the thread pool, how do we ack, for example, if we consume 10,11 with the thread pool, If 12 is consumed first, shall we ack 13? If you do this, restart at this point and kafka will think that you have processed 10,11 messages. At this point, the message will be lost.

Answers from netizens

Here’s a look at some of the responses:

The net friend A:

The essence of the user’s answer was to use thread pools, and the author responded, not addressing the thread pool issue.

The net friend B:

This method is similar to the bank queue, as long as there are many queues, the processing speed will be faster, it is indeed one of the solutions to the first problem.

The net friend C:

This class mainly solves the second problem. Through external maintenance of offset, for example, we can find the correct offset that should be consumed by the way of offset entry. This is relatively complicated.

The net friend D:

There is another point of view is that the code to write a little bit better, let consumer faster, the spending power of natural will go up, this is indeed a very important point, is usually ignored by others, sometimes consumption is slow, a lot of people may set up is how should consider middleware, tend to ignore your own code.

After reading so many posts, I feel that there is no really satisfactory answer. Here are some thoughts in my mind.

A piece of my mind

For the first question, how to improve consumption power? This problem can be summed up in three ways:

  1. If each consumer machine has a fixed thread of consumption, then we can expand the consumer machine and partion, similar to the bank queuing to add queuing window.
  2. If the machine and the partion are fixed, it is a good way to increase the consumption thread, but if it is sequential consumption, it cannot increase the consumption capacity by increasing the number of threads, because the sequential consumption of each partion is a separate thread, can only be solved by the first way.
  3. To increase the consumption capacity of their own codes, if you think about the bank, if the teller’s efficiency can be improved very high, then the whole queue speed will be very fast.

For the second problem, if we use the thread pool model, how to solve the problem of message loss, here I recommend RocketMQ, we mentioned earlier that storing offset in a database is complicated and performance is poor, In RocketMQ, a TreeMap structure is used to do what we mentioned above for the database:

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
Copy the code

The key of this TreeMap is the offset of each message, and the value is some information of this message. The bottom layer of TreeMap is realized by red-black tree, and we can quickly obtain the minimum and maximum values. Every time we process a message we remove that message from the msgTreeMap,

public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.lockTreeMap.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (! msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; for (MessageExt msg : msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev ! = null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); if (! msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); } return result; }Copy the code

Msgtreemap.firstkey () = msgtreemap.firstKey () = msgtreemap.firstKey () If we have a restart, we don’t really need to worry about messages being lost.

The last

This is just a brief introduction to improving message capabilities of message queues. If you are interested in message queues, you can read some of my previous articles:

  • You must know kafka
  • You should know about RocketMQ
  • In-depth understanding of RocketMq normal and sequential message usage, principles, and optimization
  • How to implement transaction messages in depth

If you find this article helpful to you, your attention and forwarding will be my biggest support.