Three message semantics and scenarios
How kafka does not lose messages?
The Producer, Broker, and Consumer all have to do some work to ensure that messages are consumed, namely,
- Producers a lot of production news;
- The server does not lose messages;
- Consumers should not consume less information.
Producers have a lot of production news
- Sending a message using a callback method.
If the message not sent successfully, then the Producer will be carried out in accordance with the configuration rules of retry try again, if you run after retries, or message sent failure, so will kafka exception information brings us via a callback, at this time, we can not send the message of success for persistence, do follow-up compensation processing.
kafkaProducer.send(new ProducerRecord<>("foo"."bar"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception ! =null) {
// Todo handles sending failure messages}}});Copy the code
-
Configuring Reliability Parameters
2.1 Configure acks = -1
- Acks =0, indicating that the producer does not wait for a response from any server node and considers it successful as long as it sends a message.
- If acks=1, the producer considers the leader partition sent successfully.
- Acks =-1: indicates that the producer considers the message production successful only when all copies in the ISR receive the message. This configuration is the safest because if the leader replica fails, messages will not be lost when the follower replica is selected as the leader replica. However, system throughput is reduced because the producer waits for all replicas to receive the message before sending it again.
2.2 Retries = 3
The retries parameter indicates the retry times of the message. 3 is a recommended value. If the message is still not sent after three retries, you can perform additional processing on the failed message according to your own service scenario, such as persistating the message to disk and compensating for the failure after the service runs properly. 2.3 configuration retry. Backoff. Ms = 300
The retry.backoff.ms parameter indicates the retry interval in milliseconds. 300ms is a recommended value.
The server does not lose messages
-
Replication. factor > 1
Parameter Replication. factor indicates the number of partition copies on the server. If the value is greater than 1, even if the leader of the partition fails, other followers selected as the leader can still process messages.
-
Configure min.insync.replicas > 1
Min.insync. replicas indicates the minimum number of ISR replicas. In the same principle, the number of replicas is greater than one to ensure that messages are not lost.
A brief introduction to ISR. An ISR is a set of copies of a partition, and each partition has its own SET of ISRS. However, not all copies will be in the set. First, the leader copy is in the ISR set. If a follower copy’s message does not lag too long behind the leader copy, the follower copy will also be in the ISR set. However, if a follower copy lags too long behind the leader copy, it will be eliminated from the ISR set. That is, the number of copies in the ISR is less than or equal to the number of copies in the partition.
-
Ensure replication.factor > min.insync.replicas.
If the two are equal, then the entire partition will not work as long as one copy hangs up. Not only do we need to improve message persistence and prevent data loss, but we need to do it without reducing availability. Replication. factor = min.insync.replicas + 1 is recommended.
-
Configuration unclean. Leader. Election. Enable = false
Unclean. Leader. Election. Whether can enable refers to the duplicates of elected as leader of the ISR set. Unclean. Leader. Election. Enable = true, that is allowed to copy of followers to become leader of the ISR set. Because a replica message in a non-ISR set may have lagged behind the leader message for a long time, with incomplete data, messages may be lost if selected as the leader replica.
Consumers have plenty of spending news
-
Manually submitting messages
1.1 configure enable.auto.com MIT = false
Enable.auto.com MIT This parameter indicates whether to commit automatically. When set to false, the right to commit messages is given to the developer. After automatic submission is set, the consumer may fail to consume messages but automatically submit them, resulting in message loss. 1.2 The correct way to manually submit a message is to process the message first and then submit the offset. The code is as follows:
kafkaConsumer.subscribe(Collections.singleton("foo")); try { new Thread(() -> { while (true) { ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100)); handlerRecord(records); kafkaConsumer.commitSync(); } }).start(); } catch (Exception e) { errHandler(e); } Copy the code
However, in this case, the message may have been successfully consumed, but when the offset is delivered, the Consumer suddenly goes down and the message fails to be submitted. When the consumer restarts, it may receive a message that has been successfully processed, and the duplicate message is consumed, so the manual submission of the message requires some idempotent measures.
Message not repeated
The production end does not produce messages repeatedly
Due to network reasons, the Producer retries the message, but the Broker may have received the message before, causing the Broker to receive duplicate messages.
After version 0.11.0, Kafka assigns each Producer a unique ID and a serial number to each message. This allows the server to deiterate the message. However, if two producers produce the same message, Kafka cannot deiterate the message. So we can customize a unique message ID in the message header and manually deduplicate the message on the consumer side.
The consumer does not consume messages repeatedly
Because manual commit is configured to ensure many consumption messages, repeated messages are received because other consumers join, rebalance, or the consumer fails to commit during message processing.
We can filter out duplicate messages by customizing unique message ids.