Consumers and consumer groups
The Consumer is responsible for subscribing to topics in Kafka and pulling messages from them. There is also the concept of Consumer groups in Kafka. Every consumer has a corresponding consumer group. When a message is published to a topic, it is delivered to a consumer in the consumer group that subscribes to it.
Kafka supports two message delivery modes: peer-to-point (P2P) and publish-subscribe (Pub/Sub).
- The point-to-point pattern is based on queues, where message producers send messages to queues and consumers receive messages from queues. All consumers are in the same consumer group, and each message is processed by only one consumer.
- The publish-subscribe model is used when messages are broadcast one-to-many. All consumers belong to different consumer groups, so all messages are broadcast to all consumers, and each message is processed by all consumers.
Note that messages for a partition under a topic can only be assigned to one consumer, and a message can only be assigned to one consumer member of the consumer group. For details about consumer groups, see: Kafka Consumer groups for use and understanding – a timely test exposure and location of a fault introduction.
General process for client development
The general consumer’s consumption logic consists of the following steps:
- Configure consumer client parameters and create corresponding consumer instances;
- Subscribe to topics;
- Pull messages and consume them;
- Submit consumption shift;
- Turn off consumers.
Example:
public class KafkaConsumerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static AtomicBoolean isRunning = new AtomicBoolean(true); Public static Properties initConfig() {Properties pros = new Properties(); Props. The put (" the bootstrap. The servers, "brokerList); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("client.id", "consumer.client.id.demo"); props.put("group.id", groupId); return props; } pubilc static void main(String[] args) { Properties props = initConfig(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { while(isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMills(1000)); for (ConsumerRecords<String, String> record : }}} Catch (Exception e) {log.error(" Exception happends", e); } finally { consumer.close(); }}}Copy the code
Subscribe to topics and partitions
-
Subscribe to the topic
Consumer’s subscribe() method can be used to subscribe to topics for consumers, either as collections or as regular expressions for pattern-specific topics. The subscribe() method has the following overloaded methods:
public void subscribe(Collection<String> topics); public void subscribe(Pattern pattern); public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener); public void subscribe(Pattern pattern, ConsumerRebalanceListener listener); Copy the code
Note: if a consumer calls subscribe() twice to subscribe to a topic, the last topic to be subscribed to will be the last one called, that is, multiple subscriptions to the topic will be overwritten.
-
Subscribe to a specific partition
In addition to subscribing to topics, Kafka consumers can also subscribe directly to specific partitions of a topic using the assign() method, which is signed as follows:
public void assign(Collection<TopicPartition> partition); Copy the code
The generic TopicPartiton class has only two attributes: topic and partition, which represent the topic to which the partition belongs and its own partition number, respectively. The following code implements the consumer subscribing only to the partition numbered 0 in the partition:
consumer.assign(Arrays.asList(new TopicPartition("topic-demo".0))); Copy the code
-
Get topic metadata information
KafkaConsumer also provides a method to obtain all partitions under a topic: the partitionsFor() method, which queries metadata information for a given topic, as defined below:
public List<PartitionInfo> partitionsFor(String topic); Copy the code
-
unsubscribe
The unsubscribe() method is used to unsubscribe a topic or partition from which a consumer subscribes. The subscribe(Collection) and Assign (Collection) methods can also be unsubscribed if the Collection parameter in the subscribe(Collection) and assign methods is set to null.
consumer.unsubscribe(); consumer.subscribe(new ArrayList<>()); consumer.assign(new ArrayList<TopicPartition>()); Copy the code
News consumption
Message consumption in Kafka is based on a pull pattern, in which the consumer initiates a request to the server to pull the message.
Message consumption in Kafka is a continuous polling process, with consumers constantly calling the poll() method to return a set of messages on the topic (partition) to which they subscribed.
The poll() method is defined as follows:
public ConsumerRecords<K, V> poll(final Duration timeout);
Copy the code
The timeout parameter is used to control how long the poll() method blocks when no data is available in the consumer’s buffer.
Kafka consumers generally consume in one of two ways:
-
Consume messages by partition
The ConsumerRecords class provides a Records (TopicPartition) method to retrieve messages for a specified partition in the message set:
public List<ConsumerRecord<K, V>> records(TopicPartition partition); Copy the code
The following code shows how to get all partition information in a message set:
ConsumserRecord<String, String> records = consumer.poll(Duration.ofMills(1000)); for (TopicPartition tp : records.partition()) { for (ConsumerRecord<String, String> record : records.records(tp)) { System.out.println(record.partition() + ":"+ record.value)); }}Copy the code
-
Consume messages by topic
public Iterable<ConsumerRecord<K, V>> records(String topic); Copy the code
The ConsumerRecords class does not provide a topics() method similar to partition() to get all the topics in the message set, so consuming messages by topic can only get messages by traversing the list of topics to which the consumer subscribed, as shown in the following code:
List<String> topicList = Arrays.asList(topic1, topic2); consumer.subscribe(topicList); try { while(isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMills(1000)); for (String topic : topicList) { for (ConsumerRecord<String, String> record : records.records(topic)) { System.out.println(record.topic + ":"+ record.value()); }}}}finally { consumer.close(); } Copy the code
Displacement to submit
Each message has its own offset in the Kafka partition, representing its position in the partition. Consumers also have the concept of offset, which means consuming to the location of a message in the partition. In a partition, offset can be translated as “offset”, and at the consumer level, offset can be translated as “displacement”.
The relationship between lastConsumerOffset, Committed offset and position is shown in the following figure:
position = committed offset = lastConsumedOffset + 1
From the user’s point of view, there are two ways of displacement submission: automatic submission and manual submission.
Automatically submit
Enable enable.auto.mit on the Consumer side and set it to true to enable automatic submission.
Another parameter that comes in handy, auto.mit.interval.ms, defaults to 5 seconds, indicating that Kafka automatically commits a shift every 5 seconds.
Part of the code for setting the above two parameters is:
props.put("enable.auto.commit"."true");
props.put("auto.commit.interval.ms"."2000");
Copy the code
With automatic commit enabled, Kafka guarantees that all messages returned from the last poll will be submitted when it starts calling the poll method. Commit the shift of the previous batch of messages before processing the next batch, so automatic commit ensures that no consumption is lost.
The method of automatic submission of consumption shift is relatively simple and avoids the complicated logic of submission of consumption shift. However, automatic submission also has a big disadvantage, that is easy to cause the problem of repeated consumption.
The scenario for repeated consumption is as follows: just after committing a consumption shift, a batch of messages are pulled and consumed, and before the next automatic commit consumption shift, the consumer crashes or Kafka is rebalanced, and the consumption is resumed from where the last shift committed. Reducing the time interval for automatic submission can reduce the window size of repeated messages, but it cannot completely avoid repeated consumption of messages, and will lead to more frequent displacement submissions.
Manual submission
Compared with automatic submission, manual submission implementation is more flexible and can fully control the timing and frequency of displacement submission.
To use manual submission, set the enable.auto.mit parameter to false and then manually invoke the corresponding API in the application to manually commit the consumption shift.
From the perspective of Consumer side, manual submission can be divided into synchronous submission and asynchronous submission according to method return mode.
Synchronization to submit
The API for synchronous commit is KafkaConsumer#commitSync() and the method declaration is as follows:
public void commitSync(a);
Copy the code
This is a synchronous operation in which the method is called and waits until the shift is successfully committed before returning. The common usage of synchronous commit is as follows:
while(isRunning.get()) {
ConsumerRecords<String, String> records = consumer.pool(Duration,ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
process(record);// Process the message
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e);// Handle the commit failure exception}}Copy the code
Manual synchronous commit The Consumer is blocked when commitSync() is called, and the blocking does not end until the Broker returns the commit result. Moving the commit blocks affect the TPS of the entire application. Extending the commit interval can reduce the negative impact of blocking on performance, but the consequence is that the Consumer commits less frequently, and the next time the Consumer restarts, more messages will be re-consumed.
Asynchronous submission
Method declarations for asynchronous commits:
public void commitAsync(a);
Copy the code
This is an asynchronous operation that returns immediately after calling the method, does not block, and does not affect the TPS of the Consumer application. Because it is asynchronous, Kafka provides callback functions that implement post-commit logic, such as logging or handling exceptions. CommitAsync is generally used as follows:
while(isRunning.get()) {
ConsumerRecords<String, String> records = consumer.pool(Duration,ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
process(record);// Process the message
}
consumer.commitAsync(offsets, exception) -> {
if(exception ! =null) { handle(exception); }}); }Copy the code
Can asynchronous commit completely replace synchronous commit? No, the downside of asynchronous commits is that there is no automatic retry when a problem occurs. Because this is an asynchronous operation, if the commit is automatically retried after a failure, the commit displacement may have expired by the time of retry. Retry does not make sense for asynchronous commits, and therefore calls to commitAsync will not retry.
Synchronous + asynchronous combined shift commit
Then, in manual submission, the combination of synchronous and asynchronous submission can achieve the optimal effect. The two can be used together to solve the following problems:
- CommitSync retry mechanism avoids transient errors such as network jitter and Broker side GC. For these transient problems, automatic retries usually succeed;
- CommitAsync mechanisms reduce the impact on TPS by preventing programs from always blocking.
The specific implementation of synchronous + asynchronous submission is as follows:
try {
while(isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
process(records);// Process the message
consumer.commitAsync();// Use asynchronous commit to avoid blocking}}}catch (Exception e) {
handle(e);// Handle exceptions
} finally {
try {
consumer.commitSync(); // Use synchronous blocking commit for the last exception commit
} finally{ consumer.close(); }}Copy the code
The above snippet uses both commitSync() and commitAsync(). Call commitAsync() for regular periodic commits to avoid program blocking, and call commitSync() to perform a synchronous blocking shift commit before the Consumer closes to ensure that the correct shift data is saved before the Consumer closes.
Specify partition, consumption shift commit
Consumer provides synchronous and asynchronous commit methods with parameters, which implement commit by specific consumption shift. The method declaration is as follows:
public void commitSync(Map<TopicPartition, OffsetAndMetadata>);
public void commitAsync(Map<TopicPartition, OffsetAndMetadata>);
Copy the code
Method parameters are a Map object, key is TopicPartition, that is, message partition, value is OffsetAndMetadata object, save mainly displacement data.
For example, we need to commit a shift for every 100 messages processed, taking commitAsync as an example (commitSync implements the same). The corresponding code is as follows:
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0; .while(isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
process(records);// Process the message
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // The shift to be committed is the shift of the next message
if (count % 100= =0) {
consumer.commitAsync(offsets, null); // The callback processing logic is null} count++; }}Copy the code
Multithreaded implementation of message consumption
There are generally two ways for Kafka consumers to consume messages using multiple threads.
-
The consumer program starts multiple threads, each of which creates a separate KafkaConsumer instance that is responsible for the entire message retrieval and message processing process. The implementation code is as follows:
public class KafkaConsumerRunner implement Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run(a) { try { consumer.subscribe(Arrays.asList("topic")); while(! close.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord record : records) { // Process the messageprocess(record); }}}catch (WakeupException e) { if(! close.get()) {throwe; }}finally{ consumer.close(); }}public static void main(String[] args) { int consumerNum = 4; for (int i = 0; i < consumerNum; i++) { KafkaConsumerRunner consumerRunner = new KafkaConsumerRunner(); newThread(consumerRunner).start(); }}}Copy the code
The above code creates a Runnable class that performs message retrieval and message processing logic. Each KafkaConsumerRunner object creates a KafkaConsumer instance. In the actual application, multiple KafkaConsumerRunner instances are created and started in sequence to realize the multithreaded architecture of scheme 1.
-
A single consumer instance retrieves messages, and a thread pool is used to achieve multithreaded consumption messages, which are decoupled from consumption messages. The corresponding code is as follows:
private final KafkaConsumer<String, String> consumer; privateExecutorService executors; .private intworkerNum = ... ; executors =new ThreadPoolExecutor( workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { executors.submit(newWorker(record)); }}...Copy the code
The code above uses a single thread to fetch messages and submit them separately to a thread pool for processing. The main thread calling the poll method is not responsible for the message processing logic, thus implementing the multithreaded architecture of method 2.