You may be familiar with transactions, such as database transactions or distributed transactions. What does a transaction look like in Kafka?
Before we talk about Kafka’s transactions, let’s talk about the implementation of idempotent in Kafka. Idempotent and transaction are two features introduced in Kafka 0.11.0.0 to implement EOS (exactly Once Semantics).
Idempotent, which simply means that multiple calls to an interface produce the same result as one call. It is possible for producers to write messages repeatedly while retrying, which can be avoided by using Kafka’s idempotent capabilities.
Turning on idempotency is as simple as explicitly setting the producer-client parameter enable.idempotence to true (the default value of this parameter is false).
How exactly does Kafka implement idempotent? Kafka introduces the concepts of producer ids (PID) and sequence numbers. Each new producer instance is assigned a PID when initialized, which is completely transparent to the user.
For each PID, each partition to which a message is sent has a corresponding serial number that increases monotonically from zero. Each time a producer sends a message, it increments the value of the corresponding sequence number by one.
The broker side maintains a sequence number in memory for each pair. For each message received, the broker receives it only if its sequence number value (SN_new) is 1 greater than the corresponding sequence number value (SN_old) maintained on the broker side (that is, SN_new = SN_old + 1).
If SN_new< SN_old + 1, the message is being written repeatedly and the broker can discard it. If SN_new> SN_old + 1, it indicates that some data has not been written and the order is out of order, indicating that messages may be lost. This exception is a serious exception.
The introduction of sequence numbers to implement idempotence is also for each pair, that is, Kafka’s idempotence only guarantees the idempotence of a single partition in a single producer session. Idempotence does not work across multiple partitions, and transactions can compensate for this.
Transactions guarantee atomicity of writes to multiple partitions. The atomicity of operations means that multiple operations will either all succeed or all fail. There is no possibility of partial success or partial failure.
In order to use transactions, an application must provide a unique transactionalId, which is explicitly set by the client parameter Transactional. Transactions require the producer to turn on the idempotent feature, so turning on the transactional feature by setting the transactional. Id parameter to non-null requires enablement. Idempotence to true (KafkaProducer defaults to true if it is not explicitly set). If the user explicitly sets enable.idempotence to false, ConfigException is raised.
TransactionalId corresponds to PID in a one-to-one way. The difference between transactionalId and PID is that transactionalId is explicitly set by the user, whereas PID is assigned internally by Kafka.
In addition, in order to ensure that the old producer with the same transactionalId can be invalidated immediately after the new producer starts, each producer will obtain a monotonously increasing producer epoch when obtaining PID through transactionalId. If two producers are started using the same transactionalId, an error is reported for the first producer that was started.
From a producer perspective, through transactions, Kafka guarantees idempotent messages across producer sessions and transaction recovery across producer sessions.
The former means that when a new producer instance with the same transactionalId is created and working, the old producer instance with the same transactionalId will no longer work.
The latter means that when a producer instance goes down, the new producer instance can ensure that any outstanding old transactions are either committed or aborted, so that the new producer instance can start working in a normal state.
KafkaProducer provides five transaction-related methods, detailed below:
void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId)
throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;
Copy the code
The initTransactions() method is used to initialize transactions; The beginTransaction() method is used to start a transaction; The sendOffsetsToTransaction() method provides the consumer with an operation to shift the commit within the transaction; The commitTransaction() method is used to commit transactions; The abortTransaction() method is used to abort a transaction, similar to a transaction rollback.
On the consumer side, there is a parameter isolation.level that is closely related to transactions. The default value of this parameter is “read_uncommitted”, which means that consumer applications can see (consume) uncommitted transactions and, of course, committed transactions.
This parameter can also be set to read_COMMITTED, which means that consumer applications cannot see messages within uncommitted transactions.
For example, if a producer starts a transaction and sends three messages msG1, MSG2, and MSG3 to a partition value, before executing commitTransaction() or abortTransaction(), A consumer application set to “read_COMMITTED” cannot consume these messages, but these messages are cached inside KafkaConsumer and cannot be pushed to the consumer application until the producer executes the commitTransaction() method. On the other hand, if the producer executes abortTransaction(), KafkaConsumer discards the cached messages instead of pushing them to the consumer application.
In addition to the ordinary messages in the log file, there is another message that is used to mark the end of a transaction. It is the ControlBatch message. There are two types of control messages: COMMIT and ABORT, which are used to indicate that a transaction has been successfully committed or aborted.
The sixth digit of the Attributes field in RecordBatch identifies whether the current message is a control message. This bit is set to 1 if it is a control message, and 0 otherwise, as shown in the figure above.
The fifth bit in the Attributes field identifies whether the current message is in a transaction, 1 if it is, and 0 otherwise. Because the control message is also in a transaction, the fifth and sixth bits of the Attributes field are set to 1.
We also in a Kafka on popular science series about LSO – “Kafka science | what is LSO series”, it is closely related with Kafka’s affairs, looking at below, did you remember it.
Welcome to support the author’s book: “Illustrated Kafka Guide” and “Illustrated Kafka’s Core Principles”
Welcome to support my new books: In-depth Understanding of Kafka: Core Design and Practice Principles and RabbitMQ Field Guide. Also welcome to follow my wechat official account: Zhu Xiaosi’s blog.