We analyze from the perspectives of producer, consumer and broker.
Message transmission guarantee
Generally speaking, the message transmission guarantee of messaging middleware has three levels as shown in the following figure:
Kafka’s message transfer guarantee mechanism is straightforward.
For the producer, after the producer sends the message:
If a message is successfully committed to the log file, it is not lost because of the multi-copy mechanism.
If the message is not successfully submitted to the log file (for example, due to network fluctuation), the producer cannot determine whether the message is successfully submitted. In this case, the producer can try again. This process may cause repeated writes of the message.
Kafka guarantees message transmission at least once.
For the consumer, the order in which the consumer processes the message and submits the consumption shift largely determines what kind of messaging guarantee the consumer provides.
If, after the consumer pulls the message, the application logic processes the message first and then submits the consumption displacement, then the consumer breaks down after the message processing and before the displacement submission, and when it comes back online, it will pull the message from the position of the last displacement submission, resulting in repeated consumption, which corresponds at least once.
If the application logic submits the post-processing message after the consumer pulls the message, then the consumer breaks down after the shift submission and before the message processing is completed. When the consumer comes back online, it will pull the message from the submitted displacement, causing the message loss.
Producer
end
Sending of messages
KafkaProducer’s send() method is not void, but Future
. It has two overloaded methods:
public Future<RecordMetadata> send(ProducerRecord<K,V> record)
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
Copy the code
The send() method itself is asynchronous, and the Future object returned by the send() method allows the caller to retrieve the sent result later.
Synchronous sending can be implemented using the returned Future object as follows:
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
Copy the code
The above code directly chain-calls the get() method after the send() method to block and wait for kafka’s response until the message is sent or an exception occurs. If an exception occurs, you have to catch the exception and hand it over to the outer logic.
Another synchronous send code is as follows:
try {
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata.topic()+recordMetadata.partition()+recordMetadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
Copy the code
From the above code, we can get a RecordMetadata object that contains some metadata for the message, such as the subject, partition, offset, and so on. The first method is easier if you don’t need the data.
The asynchronous sending code is as follows:
try { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e ! = null) {e.printStackTrace(); } else { System.out.println(recordMetadata.topic() + recordMetadata.partition() + recordMetadata.offset()); }}}); } catch (Exception e) { e.printStackTrace(); }Copy the code
Asynchronous sending is typically a Callback function specified in the send() method. Kafka calls this function when it returns a response.
If record1 is sent before record2, Kafka guarantees that Callback1 is called before Callback2 for the same partition.
Producer
The solution
There are generally two types of exceptions in KafkaProducer: retried exceptions and non-retried exceptions. For example, exceptions caused by network fluctuations are retried exceptions.
For retries, you can configure the retries parameter, which means that an exception will not be thrown if the exception is recovered within a specified number of retries. The default value is 0(3 is recommended). The configuration mode is as follows:
prop.put(ProducerConfig.RETRIES_CONFIG,3)
Copy the code
Note: This configuration item has some performance implications. For example, it will increase the delay for clients to respond to exceptions.
How do you avoid message duplication when setting the above configuration to a value greater than 0?
Kafka has introduced idempotence and transaction features since version 0.11.0.0 to implement EOS (exactly once mantics) and ensure that resending does not cause messages to repeat in the log. The Broker assigns a Producer ID (PID) and deduplicates the Producer through the sequence number of each message. Transactions are used to guarantee atomicity of writes to multiple partitions.
Idempotence = true. Idempotence = true
Method configuration to enable transaction support: Set the attribute transcational. Id = custom.
Also has a configuration items to note: Max. * * in the flight. Requests. Per. The connection configuration means: * * limit each connection (that is, the connection between the client and the Node) up to cache the number of requests, the default is 5. If this configuration is greater than 1, it will be out of order (Kafka guarantees that messages within a partition are ordered) :
- If the first batch of messages fails and the second batch succeeds, the producer resends the first batch of messages. If successfully sent, the two batches of messages will be out of order.
- If the order of messages needs to be guaranteed, use
max.in.flight.requests.per.connection
Set it to 1 instead of 1acks
Set it to 0,However, this still affects throughput.
A related configuration item, the retry.backoff.ms parameter, sets the time interval between retries to prevent producers from abandoning retries prematurely.
These parameters, when used together, allow the producer to retry more reliably.
After KafkaProducer sends a message, it commits (writes the message to the log). Because of the copy mechanism, the message is not lost. The number of copies received by KafkaProducer before the message is considered successfully written can be specified by the acks parameter. It involves a trade-off between message reliability and throughput.
prop.put(ProducerConfig.ACKS_CONFIG,"-1");
Copy the code
There are three types of ack values (strings) :
-
Acks = 1. The default value is 1. After the producer sends the message, a successful response is received from the server as long as the leader copy of the partition writes successfully.
- If the message cannot be written to the Leader copy (the leader copy crashes and is being reelected), the producer gets an incorrect response. To avoid loss of the message, the producer can choose to resend the message. At least once.
- If the message is written
leader
Copies and successfully return a response to the producer, but before other follower copies are pulledleader
The copy crashes, then the message is still lost because of the new electionleader
This message is not in the copy. acks
Setting it to 1 is a trade-off between message reliability and throughput.
-
Acks = 0. The producer does not need to wait for any response on the server side after sending the message.
- If an exception occurs between the time a message is sent and the time it is written to Kafka, kafka does not receive the message, the producer does not know, and the message is lost.
- This setting maximizes throughput.
-
Acks = 1. This configuration achieves maximum reliability when a producer sends a message and waits for all replicas in the ISR to successfully write the message before receiving a successful response from the server.
- If the write succeeds
leader
After the copy, before synchronizing with all copies in the ISRleader
When the replica is down, the producer receives an exception telling him that the delivery failed. - Note:This does not mean that the information is necessarily reliable! Because the ISR might only have
leader
The copy, at this point, degenerates intoacks=1
In the case.
So when will there be only ISR
leader
What about copies?When the message inflow speed of the leader copy is fast, while the synchronization speed of the follower copy is slow, all the follower copies are removed from the ISR set at a certain critical point. In this case, acks=-1 changes to acks=1, which increases the risk of message loss.
How to handle this situation?
Kafka (the Broker side) provides a single parameter to deal with this: the min.insync.replicas parameter (default 1), which can be used with acks=-1. This parameter specifies the minimum number of copies in the ISR set. If not, an exception is thrown.
Generally, the number of copies >min.insync.replicas is required. For example, set the number of replicas to 3 and min.insync.replicas to 2. Note that this parameter has a side effect on Kafka’s availability as it improves availability. For example, the ISR has only one leader copy. If the leader copy is configured, the leader copy cannot be written.
Related to this is a configuration that unclean. Leader. Election. The enable, the default is false. Whether a new leader can be elected from a non-ISR set while the leader is offline. If true, messages are lost, and if false, availability is affected. The configuration can be flexibly selected based on requirements.
Conclusion:
acks
=-1, ensuring that all replicas have successfully written messages before receiving a successful response from the server.min.insync.replicas
>1, ensure that no ISR set exists onlyleader
The condition of the copy.retries
>=3, increase the retry times to ensure that the message is not lost (Message duplication may occur)retry.backoff.ms
Cooperate according to the scenarioretries
Parameter to retry.max.in.flight.requests.per.connection
=1 to ensure the ordering of messages within the partition.enable.idempotence = true
: Enables the method configuration of idempotent delivery to prevent message duplication caused by retries.
Broker
end - If the write succeeds
The Kafka Broker cluster receives data and stores it to disk persistently. Messages are first written to the page cache. The operating system is then responsible for specific flush tasks or forced flush using fsync. If a follower replica that is far behind the leader replica is elected as the new leader replica, the lagging message data will be lost.
How to solve it?
Synchronous disk flushing (not recommended) :
Through the flush. The interval. The message, the flush. Interval. To control the parameters, such as ms. Synchronous flush improves message reliability and prevents the loss of messages that are in the page cache but not written to disk in time due to machine power failures. But it can seriously affect performance.
This is guaranteed by a multi-copy mechanism (recommended). The configuration is as follows:
unclean.leader.election.enable
: set tofalse
Refuse to vote from replicas outside the ISR collectionleader
Copy. In this way, there is no need to worry about the copy of the election message lagging behind the originalleader
Too many copies.replication.factor
: Sets the value to be greater than or equal to 3leader
Only after the copy went downfollower
Copy electedleader
.min.insync.replicas
: Set it to greater than 1 to specify the minimum number of copies in the ISR set, ensuring that the ISR set does not contain only copiesleader
Duplicate condition occurs. Note: min.insync.replicas >min.insync.replicas
Consumer
end
For the Consumer side, one configuration needs special attention: enable.auto.mit. This configuration defaults to true, that is, automatic displacement submission is enabled. This method is very simple, but can cause the problem of repeated consumption and consumption loss.
This default autocommit is a periodic commit, controlled by the client parameter auto.mit.interval.ms, which defaults to 5s, and commits the maximum message displacement in each partition of the pull channel.
We can set it to false to perform a manual commit.
If some messages cannot be consumed due to abnormal application parsing messages, you can temporarily save these messages in a dead-letter queue to avoid affecting the overall consumption progress.
When we use manual commit displacement we encounter two situations:
-
Process the message before committing the shift (recommended)
If there is an outage while processing a message, the message will be pulled from the last offset after the Consumer restarts, due to an unsuccessful commit shift. Cause the problem of repeated consumption, need business to ensure idempotency.
-
If the shift has been committed, the processing of the message is down. Since the shift has been committed, when the Consumer restarts, it will consume from the offset that has been committed. The previously unprocessed message will not be processed, and the message will be lost to the Consumer.
The rule here is that if a message is not successfully consumed, the corresponding consumption shift cannot be committed.
Manual commit can be subdivided into synchronous commit and asynchronous commit, namely, commitSync() and commitAsync() for Consumer. The following code processes the message first, then commits the shift!
The synchronous submission code is as follows:
while (IS_RUNNING.get()){ ConsumerRecords<String, String> records = consumer.poll(100); For (ConsumerRecord<String, String> record: records) {// logic} consumer.commitSync(); }Copy the code
The Consumer.com mitSync() commits based on the latest shift pulled by the poll() and blocks the consumer thread until the shift commits as long as there are no unrecoverable errors, or unrecoverable exceptions that need to be caught and processed accordingly.
In addition, the boundaries of the submission displacement can also be divided according to the granularity of the partition, with the following code:
try { while (IS_RUNNING.get()) { ConsumerRecords<String, String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : PartitionRecords) {// Business logic} long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); }Copy the code
In contrast to commitSync(), asynchronous commit execution does not block the consumer thread and may start a new pull operation before the result of the commit consumption shift is returned, as follows:
while (IS_RUNNING.get()) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic:" + record.topic() + ",offset:" + record.offset() + ",value:" + record.value()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e ! = null){ e.printStackTrace(); }else { logger.error("fail to commit offsets {}",offsets); }}}); }Copy the code
The onComplete() method of OffsetCommitCallback is called back when the shift commit is complete.
What if an asynchronous commit fails?
If you choose to retry after a failed commit, then you will cause a repeat consumption problem! (Retries are generally not required, since displacement commit failures are rare and retries add complexity to code logic.)
For example, one commit with displacement A failed, and the next commit with displacement A + B asynchronously succeeded. If the retry mechanism is used, if the consumption shift of the previous asynchronous commit is successful during the retry, then the consumption shift will also become A. If an exception occurs again, the consumer will start consuming from A after recovery.
How to solve it?
An increasing sequence number can be set to maintain the order of asynchronous commits. Increments the value corresponding to the serial number after each submission. After the submission fails, check the size of the submitted displacement and the ordinal value. If the former is smaller, it indicates that a larger displacement has already been submitted, so there is no need to retry.
In the case of asynchronous commit, if the consumer drops out abnormally, it is likely to result in a situation of repeated message consumption, because in this case, the commit shift cannot be delivered in time; If consumers exit or rebalance normally, they can be checked by synchronous submission before exit or rebalance.
Conclusion:
To ensure that messages are not lost, enable enable.auto.com MIT to false. The code logic processes messages first and then submits the displacement. For message repeated consumption, services must be idempotent.