About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability.

GitHub address: github.com/apache/puls…

Reproduced from original StreamNative, author peng-hui li, address: StreamNative. IO/en/blog/tec…

The previous article in this series, “Exact-once Semantics based on Pulsar Transactions,” showed that precise Once semantics can be guaranteed in Apache Pulsar by enabling the Transaction API. This article details a variety of messaging semantics, including:

  • Idempotent producer supports precise semantics for a single topic
  • Transaction API
  • End-to-end semantics are processed only once when Pulsar and Flink are integrated

This article provides an in-depth analysis of transactions in Apache Pulsar to familiarize the reader with the main concepts of the Pulsar transaction API for future use.

Why are transactions needed?

Transactions enhance messaging semantics and guarantee processing during flow processing (for example, using Pulsar Functions or integrating with other flow processing engines). Stream processing typically manifests as “consumption-process-production,” producing and consuming within a data stream (for example, Pulsar Topic).

With the rise of flow processing, there is a growing demand for flow processing applications with stronger processing guarantees. For example, a financial institution uses a flow processing engine to process a user’s borrowing business. This scenario requires that each message be processed once and without exceptions.

In other words, if the flow processing application consumes A message and treats the generated result as message B(B = f(A)), the exact once-processing guarantee means that A is marked as consumed only if message B is successfully generated, and vice versa.

Prior to Pulsar 2.8.0, when building flow processing applications using Apache Pulsar, it was not easy to implement precise one-time processing assurance. Integration of Pulsar with a flow processing engine, such as Flink, makes it possible to achieve precise one-time assurance. For example, you can use Flink to precisely read a message from a Pulsar topic once, but not exactly write the results to a Pulsar topic once.

When the semantics are configured at least once for Pulsar Producer and Consumer, the flow processing application cannot implement precise once processing semantics in the following scenarios:

  • Duplicate writes: Due to internal retry logic, the producer may write the same message multiple times. Idempotent producers solve this problem by de-duplicating messages.
  • Application crash: The flow processing application can crash at any time. If the application crashes after writing the result message B but without setting the source message A to consumed (that is, ACK), the application reprocesses the source message A after A restart, causing message B to be repeatedly written to the output Topic, violating the guarantee of precise one-time processing.
  • Zombie applications: In a distributed environment, flow processing applications may be partitioned from the network (if the network is temporarily unavailable). Typically, multiple new instances of the same class processing application are automatically started to replace the “lost” instance. In this case, multiple instances of the same processing application may be running in parallel, processing the same input topic and writing the results to the same output topic, resulting in duplicate output messages that violate the semantics of precise one-time processing.

In version 2.8.0, Pulsar introduced a new transaction API designed to address the problem of not being able to implement precise one-time processing semantics in scenarios 2 and 3 above.

Transaction semantics

The transaction API enables flow processing applications to consume, process, and produce messages in a single atomic operation. This means that a batch of messages in the same transaction can be received, produced, and validated from many Topic partitions. All operations that process the same transaction are integrated and either all succeed or all fail.

So how do transaction apis solve these three problems?

Atomic writes and validation across multiple topics

First, the transaction API supports atomic writes and atomic validation of multiple Pulsar topics as a single whole. All messages produced or consumed in a transaction are successfully written or acknowledged together, or none are successfully written or acknowledged. For example, errors in processing can cause a transaction to abort, in which case any messages generated by the transaction are not consumed by any consumer. What does this mean for consumption-process-produce atomic operations?

Assuming that the application consumes message A from Topic T0 and generates the resulting message B to Topic T1 after applying some transformation logic to message A (B=f(A)), then message A and message B are considered to have been successfully consumed and published together. Or not consumed at all and not published (i.e., doing nothing), when the whole consumption-process-production operation is atomic. Message A is considered consumed in Topic T0 only if it is successfully acknowledged.

The transaction API ensures that the acknowledgement of message A and the writing of message B occur as atomic operations, at which point the entire consumption-process-production operation is considered A single atomic operation.

Isolate zombie instances by conditional validation

We solve the zombie instance problem with conditional validation. Conditional confirmation means that when two transactions attempt to confirm the same message, Pulsar guarantees that only one transaction can be confirmed successfully and the confirmation of the other transaction is aborted.

Read transaction message

What guarantees are there for reading messages written by transactions?

The Pulsar broker distributes the transaction message to the consumer only if the transaction has been committed. In other words, the broker does not distribute messages if the transaction is still in progress. Transaction messages that are aborted are also not delivered.

However, Pulsar does not guarantee that messages produced in a committed transaction will be consumed at the same time. There are several reasons: Pulsar, however, does not guarantee simultaneous consumption of messages produced in the same commit transaction for the following reasons:

  1. There are a large number of Topic partitions participating in the committed transaction, and the consumer may not consume messages on all partitions and therefore cannot read all messages generated in the transaction.
  2. Consumers may have different queue sizes or buffer sizes, so they can only receive a certain number of messages (possibly of any value).

Transaction API

Transaction features are primarily server-side protocol-level features. Currently the transaction API only supports Java clients (more languages will be supported in the future). An example of a consumption-process-production application written in Java using the Pulsar transaction API is as follows:

Let’s walk through this example step by step.

Transaction implementation

This section provides a brief overview of the new components and new request flows introduced by the transaction API. You can read the documentation, or check back to the Pulsar North America Summit video for more details about the event.

This section describes only the concepts related to transactions for users to debug or tune transactions.

component

Transaction coordinator and transaction logging

Transaction Coordinators (TCS) maintain topics and subscriptions that interact with transactions. When a transaction is committed, the transaction coordinator interacts with the Topic Owner broker to complete the transaction.

The Transaction Coordinator (TC) is a module that runs within the Pulsar broker to maintain a transaction throughout and prevent it from going into an error state. The transaction coordinator also handles transaction timeouts, ensuring that a transaction aborts after a timeout.

All transaction metadata is stored in the transaction log, which is stored in the Pulsar Topic. Transaction metadata can still be recovered from the transaction log after the transaction coordinator crashes.

Each transaction coordinator has a partitioned subset of transaction log topics, that is, the broker (of which the transaction coordinator resides) is the owner of the (topic) partition.

Each transaction has a unique transaction ID (TxnID) of 128 bits. The highest 16 bits represent the topic partition where the transaction log is located, and the remaining bits are monotonically increasing values generated based on the TC (owner of the topic partition where the transaction log is located).

It is important to note that transaction logging Topic stores only the state of a transaction, not the messages within the transaction. Messages are stored in the Topic partition. A transaction can be in various states, such as “Open”, “Prepare Commit”, and “Committed”. Transaction logs store transaction status information and associated metadata.

Transaction buffer

The messages in the transaction (originally stored in the Topic partition) are stored in the broker transaction buffer where the corresponding Topic partition resides. Messages in the transaction buffer are not visible to the consumer until the transaction is committed. When a transaction aborts, messages in the transaction buffer are discarded.

The Pending state of ack

Before committing the transaction, the messages in the transaction are confirmed to be in a Pending ACK state. If the message is in a pending ACK state, it is not removed from the pending ACK state when the transaction terminates and other transactions cannot acknowledge the message. (A message cannot be acknowledged by another transaction until the message is removed from the pending ACK state.)

Pending ACK states are stored in pending ACK logs. Pending ACK logs are stored in cursor logs. After restart, the broker can restore the transaction state from pending ACK logs to ensure that acks are not lost.

The data flow

As you can see from the upper API, the data flow can be divided into several steps:

  • Start a transaction;
  • Publish transaction messages;
  • Confirm transaction message;
  • Complete the transaction.

Open the transaction

At the start of a transaction, the Pulsar client requests a new transaction ID from the locator transaction coordinator. Upon receipt of the request, the transaction coordinator assigns a transaction ID to the transaction. The transaction is then logged automatically and its ID and status (OPEN, as shown in Step 1A) are recorded, ensuring that the transaction state is persisted (without fear of transaction coordinator crashes). After recording the transaction status, the TC returns the transaction ID to the Pulsar client.

Publish transaction messages

Before the Pulsar client sends a message to the new topic partition, the client requests the TC to add this topic partition to the transaction. The TC records and persists changes to the partition in its transaction log (as shown in 2.1a), ensuring that the TC knows all partitions where the transaction is being processed. Therefore, at end-partition time, TCS can commit or abort changes to this transaction on all partitions.

The Pulsar client starts sending messages to the partition, following the normal message sending process. The only difference is that the batch message generated by the transaction contains the transaction ID. The receiving broker checks whether the batch messages belong to a transaction. If it does not belong to a transaction, the broker processes writes normally. Otherwise, the broker writes the batch messages to the partition’s transaction buffer.

Carries transaction Ack messages

The Pulsar client sends a request to the TC when the first subscription is acknowledged as part of the transaction. In Step 2.3a, the TC logs new subscriptions to the transaction, ensuring that the TC knows all subscriptions being processed by the transaction, so that the TC can commit or abort changes to each subscription during the EndTxn phase.

The Pulsar client starts to ack the message on the subscription with the same transaction acknowledgment process as normal, but the transaction acknowledgment request contains the transaction ID. The broker receiving the ACK request checks if the ACK belongs to a transaction. If so, it marks the message as: PENDING_ACK state, which means that no other consumer can ack or nack the message until the ack is committed or aborted. This ensures that when two transactions ack the same message, only one will succeed and the other will be aborted.

The Pulsar client aborts the entire transaction if a conflict is detected in both a single acknowledgement and a cumulative acknowledgement when attempting to acknowledge a message.

To complete the transaction

At the end of the transaction, the application decides whether to commit or abort the transaction. A transaction can also be aborted if a conflict is detected while acknowledging a message.

When the transaction completes, the Pulsar client can request the TC to end the transaction, with a field identifying whether the transaction is committed or aborted.

The TC writes a commit or abort message to its transaction log (as shown in 3.1a) and sends a commit or abort transaction request to all partitions involved in the transaction. As shown in 3.2.

When all partitions receiving the request have successfully committed or aborted the transaction, the TC writes the committed or aborted message to its transaction log. As shown in figure 3.3.

How is transaction performance

Now that this article has explained the semantics and how transactions work, let’s look at the performance of transactions.

Transaction Producer performance

Transactions result in only moderate write magnification. The main reasons for extra writes are as follows:

  • For each transaction, the Producer receives an additional request to register the Topic partition with the transaction coordinator.
  • When the transaction completes, transaction markers are written to all partitions participating in the transaction.
  • The TC writes the transaction state change to the transaction log. The status (XXX) of all topic partitions added to the transaction will be updated and logged. (Ready to Submit and Submitted).

The cost is independent of the number of messages written as part of the transaction. Therefore, the key to improving throughput is to contain a large number of messages per transaction. Reducing the number of messages or shortening the transaction commit time reduces throughput.

The consequence of increasing transaction duration is increased end-to-end latency. Recall that the consumer does not read uncommitted transaction messages. Therefore, the longer the commit interval, the longer the consumer waits (and has to wait), increasing the end-to-end delay.

Transaction consumer performance

A consumer transaction is much simpler than producer. All logic is done on the server side of the Pulsar Broker, which distributes only completed transaction messages.

Further reading

This article provides a brief overview of Apache Pulsar transactions. You can read the following to learn more about Pulsar:

  • Design documents: Official documents that describe common interfaces, data flows, and components, including how to implement transactional components, how to handle transaction requests, how to clean transaction data, and so on.
  • Pulsar client Javadocs[3] : describes how to use the new API.
  • Implementing Exactly-Once semantics based on Pulsar transactions: the first post in this series.

My colleagues Guo Sijie and Addison Higham shared “Exactly-Once So Simple: Transactional Messages in Apache Pulsar” at the Pulsar Summit North America 2021 on June 16-17. Watch the presentation video for more details on Pulsar transactions.

conclusion

The first blog post in this series, “How Simple it is to implement precise once semantics with Pulsar Transactions,” describes how Apache Pulsar’s transaction API enables precise once semantics. In this article, we discussed the key design goals of the transaction API in Apache Pulsar, the semantics of the transaction API, and how the API actually works.

If we think of stream processing as a read-write processor, this post will focus on the read and write paths, while processing itself is a black box. However, so many things can happen during the actual processing phase that using the transaction API alone cannot guarantee accurate one-time processing. For example, if the processing logic modifies the external storage system, the transaction apis described here are not sufficient to ensure precise one-time processing.

Pulsar and Flink integration provides end-to-end precise one-time processing for various flow processing applications through transaction apis, even updating those additional state stores during processing.

In the next few weeks, we will share the third article in this series, detailing how the Pulsar and Flink integration provides end-to-end precise one-time processing semantics based on new Pulsar transactions, and how to easily write streaming applications using Pulsar and Flink.

reading

  • Technical Exploration: Transactional event flow for Apache Pulsar
  • New features overview in Pulsar 2.7.0: transaction support, topic-level policy configuration, and more

Introduction to the translator

Zi Fei, senior architect, Apache Pulsar Contributor, is now in charge of data platform and trading platform construction in a startup company. I have 9 years of work experience in financial securities field, focusing on securities business and distributed computing, and like reading, cooking and traveling. Focus on distributed, high-concurrency, in-memory transaction related technologies.

Click on thelink, get Apache Pulsar hardcore dry goods information!