The figure creates a name forfoo
Partition number is3
Topic, in the above case, discussMultithreaded development Consumer
The scheme.
Worker thread pool mode
One of this typeConsumerGroup
There are one or more Consumer instances that are constantly subscribing frompartition
In thepoll
Message, which delivers the retrieved message toworkerThreadPool
The thread pool.
advantages
- The task is split into message fetch and message processing, with different threads handling them. The biggest advantage is its high scalability, which means that we can adjust the number of threads in which the message is fetched and the number of threads in which the message is processed independently, regardless of whether the two affect each other. If your consume is slow, increase the number of consume threads. If message processing is slow, increase the number of threads in the Worker thread pool.
disadvantages
- It is difficult to implement the equivalent of having two thread groups: message fetch and message processing.
- This scheme separates message acquisition from message processing, meaning that the thread that gets a message is not the same thread that processes it, so consumption order within the partition cannot be guaranteed. This is also his fatal flaw
- Because of disadvantage 2, it can lead to message re-consumption problems and message loss problems.
Many Consumer model
The consumer program starts multiple threads, and each thread maintains its own KafkaConsumer instance, which is responsible for the entire message retrieval and message processing process.
advantages
- It’s easy to implement, just create multiple Consumer instances
- Threads are isolated, and there is no additional handling of interthread interactions
- Because the number of partitions consumed by each thread is fixed, the correctness of a unique commit is guaranteed, and as long as it is processed properly, there will be no message duplication or message loss
disadvantages
- Because each Consumer instance is equivalent to one
TCP
Link, so the corresponding resource consumption is relatively large - The number of instances of Consumer is limited
partition
The number of instances of Consumer must be less than or equal topartition
The number.
Since the Worker thread pool pattern does not guarantee successful consumption of messages, the multi-consumer pattern is strongly recommended
coded
- Worker thread pool mode
.private KafkaConsumer<String, Object> kafkaConsumer;
private final ExecutorService workers = Executors.newFixedThreadPool(4);
public void run(a) {
kafkaConsumer.subscribe(Arrays.asList("foo"."bar"));
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record->{
workers.submit(()->handlerRecord(record));
});
}
Copy the code
- Multiple Consumer instance mode
public class KafkaConsumerWorker implements Runnable {
private final KafkaConsumer<String, Object> consumer;
private final AtomicBoolean closed = new AtomicBoolean(false);
public KafkaConsumerWorker(KafkaConsumer<String, Object> consumer) {
this.consumer = consumer;
}
@Override
public void run(a) {
while(! closed.get()) {try {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
// todo handler records
} catch (WakeupException e) {
// Ignore exception if closing
if(! closed.get()) {throwe; }}finally{ consumer.close(); }}}public void shutdown(a) {
closed.set(true); consumer.wakeup(); }}Copy the code