Author: Treasure cow

This article is translated from a blog post on the Streaml. IO website: “Exactly Once is NOT Exactly the same”. It analyzes the “Exactly once” feature of stream computing systems. The main point is that “Exactly once” is NOT guaranteed to be Exactly the same. The main contents are as follows:

  1. background
  • 1.1. At most once

  • At least once in a while.

  • 1.3. Exactly once

  1. Is “precise once” really “precise once”?

  2. Comparison of distributed snapshot with at least one event passing and deduplication

  3. conclusion

  4. reference

Apache Storm, Apache Flink, Heron, Apache Kafka (Kafka Streams) and Apache Spark (Spark Streaming) are widely used in the market. One of the most widely discussed features of stream computing systems is “exactly once” semantics, which many systems claim to already support. However, there is a lot of misunderstanding and ambiguity about what exactly is “exactly once” and what constitutes a “exactly once”. So let’s do the analysis.

The background,

Stream processing (sometimes called event processing) can simply be described as the continuous processing of unbounded data or events. A flow or event processing application can be described more or less as a directed graph and is often described as a directed acyclic graph (DAG). In such a graph, each edge represents a stream of data or events, and each vertex represents an operator that processes data or events from adjacent edges using logic defined in the program. There are two special types of peaks, often called sources and sinks. Sources read external data/events into the application, and budget usually collect the results generated by the application. Below is an example of a streaming application.

A typical stream processing topology

Stream processing engines typically allow users to specify reliability patterns or processing semantics to indicate what guarantees it will provide for data processing throughout the application. These guarantees make sense because you will always encounter failures that may cause data loss due to networks, machines, etc. Stream processing engines typically provide applications with three data processing semantics: at most once, at least once, and precisely once.

Here is a loose definition of these different processing semantics:

At most once

This is essentially a “do your best” approach. Ensure that data or events are processed at most once by all operators in the application. This means that if the data is lost before it has been fully processed by the streaming application, there will be no other retries or resends. The example below illustrates this situation.

At-most-once processing semantics

At least once

All operators in the application guarantee that data or events are processed at least once. This usually means that if the event is lost before it is fully processed by the streaming application, the event will be replayed or retransmitted from the source. However, because events can be retransmitted, an event can sometimes be processed multiple times, which is called at least once.

The following example illustrates a situation where the first operator initially fails to process the event, then succeeds on the retry, and then succeeds on the second retry, which is unnecessary.

At-least-once processing semantics

Exactly once

Even in the case of various failures, all operators in a streaming application guarantee that events are processed “exactly once.” 2. It’s Exactly once.

Two popular mechanisms are commonly used to implement “exact once” processing semantics.

  • Distributed snapshot/status checkpoint

  • At least one event is passed and duplicate data is de-duplicated

The distributed snapshot/state checkpoint method to achieve “precise once” is inspired by the Chandy-Lamport distributed snapshot algorithm [1]. By this mechanism, all states of each operator in a streaming application are periodically checkpoint. If the failure occurs anywhere in the system, all states of each operator are rolled back to the latest globally consistent checkpoint. All processing is suspended during the rollback. The source is also reset to the correct offset corresponding to the most recent checkpoint. The entire flow application essentially returns to its last consistent state, from which the program can then be restarted. The following figure describes the basics of this checkpoint mechanism.

In the figure above, the streaming application works fine at T1 and does checkpoint. However, at time T2, the operator fails to process the input data. At this point, the state value S=4 is stored in persistent storage, and the state value S=12 is stored in the memory of the operator. To fix this difference, at time T3, the handler rolls back the state to S=4 and “replays” each continuous state in the stream until recently, processing each piece of data. The end result is that some of the data has been processed multiple times, but that doesn’t matter because the resulting state is the same no matter how many times you roll back.

Another way to achieve “precise once” is to achieve at least one event passing and deduplication of repeated data on each operator. A stream processing engine using this method will replay the failed events to further attempt to process and remove repeated events for each operator before they enter the user-defined logic in the operator. This mechanism requires maintaining a transaction log for each operator to track the events it has processed. Engines that exploit this mechanism include Google’s MillWheel[2] and Apache Kafka Streams. The figure below illustrates the gist of this mechanism.

At-least-once delivery plus deduplication

Two, “accurate once” is really “accurate once”?

Now let’s take a second look at what “exactly once” processing semantics really guarantees to the end user. The term “precise once” is misleading when it describes exactly once.

Some might think that “exactly once” describes the guarantee of event handling, where each event in the stream is handled only once. In fact, no engine is guaranteed to process it exactly once. In the face of any failure, it is impossible to guarantee that the user-defined logic in each operator is executed only once per event, because the possibility that the user code will be partially executed is always present.

Consider a scenario with a stream processing operator that performs a mapping operation that prints the ID of the incoming event and then returns the event invariant. The following pseudocode illustrates this operation:

Map (Event event) {
    Print "Event ID: " + event.getId()
    Return event
}
Copy the code

Each event has a GUID (globally unique ID). If precise execution of the user logic is guaranteed once, the event ID will be printed only once. This is not guaranteed, however, because failures can occur at any time during the execution of user-defined logic. The engine cannot determine on its own the point in time to execute user-defined processing logic. Therefore, there is no guarantee that any user-defined logic will be executed only once. This also means that external operations implemented in user-defined logic, such as writing to a database, are not guaranteed to be executed only once. Such operations still need to be performed idempotent.

So what do engines guarantee when they claim to process semantics “exactly once”? If you can’t guarantee that user logic will only be executed once, what logic will only be executed once? When engines declare “exactly once” processing semantics, what they are really saying is that they can guarantee that engine-managed state updates are only committed once to the persistent back-end storage.

Both of the mechanisms described above use persistent back-end storage as a source of authenticity, saving the state of each operator and automatically submitting updates to it. For mechanism 1 (distributed snapshot/state checkpoint), this persistent back-end state is used to hold a globally consistent state checkpoint (checkpoint state for each operator) for the flow application. For mechanism 2 (at least one event passing plus deduplication), the persistent back-end state is used to store the state of each operator as well as a transaction log for each operator, which tracks all events that it has fully processed.

The commit status or an application update to a persistent back end as a real source can be described as happening exactly once. However, as mentioned above, computed updates/changes in state, that is, events that perform any user-defined logic on events, can occur more than once if a failure occurs. In other words, the processing of an event can occur multiple times, but the effects of that processing are reflected only once in the persistent back-end state store. Therefore, we believe that the best term to effectively describe these processing semantics is “effectively once.”

So what do engines guarantee when they claim to process semantics “exactly once”? If you can’t guarantee that user logic will only be executed once, what logic will only be executed once? When engines declare “exactly once” processing semantics, what they are really saying is that they can guarantee that engine-managed state updates are only committed once to the persistent back-end storage.

3. Comparison between distributed snapshots and at least one event transfer and deduplication

From a semantic perspective, distributed snapshots and at least one event delivery and deduplication mechanisms provide the same guarantees. However, there are significant performance differences due to implementation differences between the two mechanisms.

The performance overhead of mechanism 1 (distributed snapshot/status checkpoints) is minimal ** because the engine actually sends regular and special events to all operators in the streaming application together, while status checkpoints can be performed asynchronously in the background. However, for large streaming applications, failures may occur more frequently, causing the engine to need to pause the application and roll back the state of all operators, which in turn affects performance. The larger the streaming application, the more likely and therefore more frequent the failure, which in turn affects the performance of the streaming application. However, this mechanism is non-invasive and has little impact on the additional resources required at runtime.

Mechanism 2 (at least one event passing plus deduplication) may require more resources, especially storage **. To use this mechanism, the engine needs to be able to track each tuple that has been fully processed by each operator instance to perform deduplication, as well as deduplication itself for each event. This means that a large amount of data needs to be tracked, especially if the streaming application is large or if there are many applications running. Every event on every operator that performs deduplication incurs a performance overhead. However, with this mechanism, the performance of streaming applications is less likely to be affected by application size. For mechanism 1, if any operator fails, a global pause and state rollback are required; For mechanism 2, the effects of failure are more local. When a failure occurs in an operator, events that may not have been fully processed are only replayed/retransmitted from the upstream source. The performance impact is isolated from the location of the failure in the flow application and has little impact on the performance of other operators in the flow application. From a performance perspective, the pros and cons of both mechanisms are as follows.

Advantages and disadvantages of distributed snapshot/status checkpoints:

  • Advantages:

  • Low performance and resource overhead

  • Disadvantages:

  • The impact on performance is significant

  • The larger the topology, the greater the potential impact on performance

The advantages and disadvantages of at least one event passing and the deduplication mechanism:

  • Advantages:

  • The impact of a fault on performance is local

  • The impact of a failure does not necessarily increase with the size of the topology

  • Disadvantages:

  • It may require a lot of storage and infrastructure to support it

  • Performance overhead per event per operator

While there is a theoretical difference between distributed snapshots and at least one event passing plus deduplication, both can be reduced to at least one processing plus idempotent. For both mechanisms, when a failure occurs (at least once implemented), the event is replayed/retransmitted, and by state rollback or event deduplication, the operator is essentially idempotent in updating the internal management state.

Four, conclusion

In this blog post, I hope to convince you that the term “precise once” is highly misleading. Providing “exact once” processing semantics really means that different updates to the state of the operator managed by the stream processing engine are reflected only once. “Precise once” does not guarantee that event handling, that is, execution of any user-defined logic, will occur only once. We prefer to use the term “effectively once” to refer to this assurance, because processing is not necessarily guaranteed to occur only once, but the impact on the state managed by the engine is reflected only once. Two popular mechanisms, distributed snapshot and deduplication, are used to achieve precise/efficient one-time processing semantics. These two mechanisms provide the same semantic guarantees for message processing and status updates, but differ in performance. This article is not going to convince you that any one mechanism is better than the other, as there are pros and cons to each.

Five, the reference

  1. Chandy, K. Mani and Leslie Lamport.Distributed snapshots: Transactions of Computer Systems (TOCS) 3 (1985): 187-198.

  2. Akidau, Tyler, Proceedings of the VLDBEndowment 6.11 (2013): 1033-1044.