Premise: This example is suitable for message topics where there is no order requirement.
Kafka is capable of tens of thousands of writes and reads per second through a series of optimizations. By increasing the number of partitions, you can increase parallel consumption capacity by deploying multiple consumers. However, there are still many cases where the execution speed of some business is too slow, and we need to use multithreading to consume and improve the utilization of the application machine, rather than adding pressure to Kafka.
ShutdownableThread
doWork
Reference: github.com/apache/kafk…
Multiple threads consume data for a partition
With multithreading, we need to create a new thread pool.
JAVA Multithreading Scenarios and Precautions Brief Version
We use a zero-capacity SynchronousQueue, one in, one out, to avoid buffering data in the queue, thus eliminating the possibility of messages being lost due to blocking queues in the event of an unexpected system shutdown. It then uses the CallerRunsPolicy saturation policy to block kafka’s consuming thread when the multithreading process is overwhelmed.
Then, we put the logic that actually handles the business into the task and execute it in multiple threads. After each execution, we manually commit an ACK to indicate that the message has been processed. Since it is the thread pool that claims these tasks, the order is not guaranteed. It is possible that some tasks are not completed, and later tasks have already submitted their offsets. o.O
But that’s not important for now, let’s just parallelize it first.
KafkaConsumer is not safe for multi-threaded access
Copy the code
Clearly, the consumer side of Kafka is not thread-safe, and it refuses to allow you to call its API this way. Kafka was well intentioned to avoid some of the problems of concurrent environments, but I did need to use multithreading.
The Kafka consumer compares the thread ID of the caller to determine whether the request was initiated by an external thread.
private void acquire() { Copy the code
long threadId = Thread.currentThread().getId(); if (threadId ! = currentThread.get() && ! currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet();Copy the code
}
Copy the code
The only way to do this is to place the commitSync function outside the thread and commit the ACK before executing the task.
Join the pipeline
The messages we get may be filtered, such as null values or conditional judgments, before they are actually executed. Although it can be run directly in the consumer thread, but it is particularly messy, can add a producer consumer model (you can think of this as gilding the lily). The blocking queue is still a SynchronousQueue, which acts as a conduit.
Once we put the task into the pipeline, we commit it immediately. If the thread pool is full, it will block in the consumer thread until there is a vacancy. Then, we start a separate thread to receive this data and submit it to this section of code that looks something like this.
The app starts up and is consumed fast.
Parameter configuration
Kafka has a lot of parameters. We care about the following parameters.
max.poll.records
Call a poll and return the maximum number of columns. If this value is set high, processing will be slow and can easily exceed the value of max.poll.interval.ms (default 5 minutes), causing consumers to go offline. Special attention is needed in the case of very time-consuming consumption.
enable.auto.commit
Whether to enable automatic submission (offset) If this is enabled, information about the offset that the consumer has consumed will be submitted intermittently to Kafka (persistent).
When automatic submission of offset is enabled, the time and frequency of submitting requests is controlled by the parameter auto.mit.interval. ms.
fetch.max.wait.ms
Bytes indicates the maximum time to wait for a fetch request if the amount of data received by the broker is insufficient. If the amount of data is sufficient, it returns immediately.
session.timeout.ms
Consumer session timeout period. If the server does not receive any requests from the consumer (including heartbeat detection) within this period, the server determines that the consumer is offline. The larger this value is, the longer the server waits for the consumer to fail and rebalance.
heartbeat.interval.ms
The interval between the consumer coordinator and the Kafka cluster for heartbeat detection. The Kafka cluster determines the activity of the consumer session by heartbeat to determine whether the consumer is online. If the consumer is offline, the partition registered by the consumer is assigned to other consumers in the same group. The value must be smaller than session.timeout.ms, that is, the session expiration time should be one third of session.timeout.ms. Otherwise, heartbeat detection is meaningless.
In this example, our parameters are simply set as follows, mainly adjusting the number of items acquired each time and the detection time. Everything else is the default.
The message to ensure
Those of you who are careful will see that our code is still not completely secure. This is because we submitted the ACK in advance. It doesn’t matter if the program works. However, when the application is shut down abnormally, the messages that are being executed are likely to be lost. For applications with very high requirements for consistency, we need to ensure this in two ways.
Use close hooks
The first is to consider the kill -15 case. This approach is relatively simple. Simply override the shutdown method on ShutdownableThread, and the application will have the opportunity to execute tasks in the thread pool before closing the application after completing consumption.
@Override
public void shutdown() {
super.shutdown();
executor.shutdown();
}
Copy the code
Using Log Processing
Use the OOM, or kill -9, and things get messy.
Maintain a separate log file (or local DB), write a log before commit, and then write a corresponding log after the actual execution. When the system starts, it reads these log files, obtains unsuccessful tasks, and executes them again.
It takes a little effort to be efficient and reliable.
Redis processing
This mode is similar to logging mode, but redis is superior to logging mode due to its high efficiency (up to tens of thousands) and convenience.
You can use a Hash structure, write Redis when you submit the task, delete the value when the task is finished, and all that is left is the message with the problem.
End
Multi-threading is to increase efficiency, redis and others are to increase reliability. Business code is very easy to write, and the logic takes care of most of it; Business code is sometimes difficult, and you need to write a lot of accessibility to make it more efficient and take care of its boundaries.
From a programmer’s point of view, the most competitive code is designed to take care of boundary exceptions that occur in small probability.
There are various tradeoffs between throughput and reliability in Kafka, many of which are fish and bear’s PAWS. Instead of focusing on it, we can use external tools to achieve greater returns. In this case, the probability of redis down and the application down at the same time is relatively small. The 5 9 messages are guaranteed to be doable. Why don’t you look for the rest of the imperfect problem messages in the log?
Read more:
1. JAVA multi-threaded usage scenarios and notes brief version
Kafka Basic Knowledge index
3. 360 degree test: Does KAFKA lose data? Is its high availability sufficient?