preface
Apache Kafka has become the dominant platform for asynchronous communication across microservices. It has many powerful features that allow us to build robust and resilient asynchronous architectures.
At the same time, we need to be careful of many potential pitfalls when using it. If we fail to spot the problems that could happen (in other words, sooner or later), we are left with a system that is prone to errors and corrupted data.
Today, I’ll highlight one of these pitfalls: trying to process a message and failing. First, we need to realize that message consumption can, and sooner or later will, fail. Second, we need to ensure that we do not introduce more problems when dealing with such failures.
Kafka profile
There are also in-depth articles on Kafka and how to use it. That being said, let’s briefly review some concepts that are important to our discussion.
Event logs, publishers, and consumers
Kafka is a system for processing data streams. Conceptually, we can think of Kafka as having three basic components:
- An Event Log to which messages are posted
- Publisher, which publishes messages to the event log
- Consumers consume (that is, consume) messages in the event log
Unlike traditional message queues such as RabbitMQ, Kafka lets consumers decide when messages are read (i.e., Kafka uses pull rather than push mode). Each message has an offset, and each consumer tracks (or commits) the offset of its most recently consumed message. In this way, the consumer can request the next message from the offset of this message.
The theme
The event log is divided into topics, each of which defines the type of message to be published to it. It’s up to us engineers to define themes, so we should keep a few rules of thumb in mind:
- Each topic should describe an event that other services might need to know about.
- Each topic should define a unique schema that each message will follow.
Partitioning and partitioning keys
Topics are further subdivided into partitions. Partitioning enables messages to be consumed in parallel. Kafka allows deterministic distribution of messages to partitions with a partition key **. A partitioning key is a piece of data (usually some property of the message itself, such as ID) on which an algorithm is applied to determine the partitioning.
Here, we assign the UUID field of the message as a partition key. The producer applies an algorithm (for example, modifying each UUID value by the number of partitions) to assign each message to a partition.
Using the partitioning key in this way allows us to ensure that every message associated with a given ID is published to a single partition.
It is also important to note that multiple instances of a consumer can be deployed as a consumer group. Kafka ensures that any message in a given partition will always be read by the same consumer instance in the group.
Use Kafka in microservices
Kafka is very powerful. So it can be used in a variety of environments, covering many use cases. Here, we focus on the most common uses in microservices architecture.
Passing messages across bounded contexts
When we first started building microservices, many of us started with some kind of centralized model. Each piece of data is hosted by a single microservice (that is, a single real source). If any other microservice needs access to this data, it will make a synchronous call to retrieve it.
This approach leads to a number of problems, including long chains of synchronous calls, single points of failure, reduced team ownership, and more.
In the end we found a better way. In today’s mature architecture, we divide communication into command processing and event processing.
Command processing is usually performed in a single bounded context and often still involves synchronous communication.
Events, on the other hand, are typically emitted by services in one bounded context and published asynchronously to Kafka for consumption by services in other bounded contexts.
On the left is the way we previously designed microservice communication: a service in a bounded context (represented by a dotted box) receives synchronous calls from services in other bounded contexts. On the right is what we do today: services in one bounded context publish events, and services in other bounded contexts consume them when they are idle.
For example, take a User bounded context. Our User team builds applications and services that are responsible for tasks such as enabling new users, updating existing User accounts, and so on.
After a UserAccount is created or modified, the UserAccount service publishes an event to Kafka. Other interested bounded contexts can consume the event, store it locally, enhance it with other data, and so on. For example, our Login bounded context might want to know a user’s current name in order to greet them when they Login.
We refer to this use case as cross-border event publishing.
When performing cross-border event publishing, we should publish aggregates. Aggregates are self-contained groups of entities, each of which is treated as a separate atomic entity. Each aggregation has a “root” entity and a number of dependent entities that provide additional data.
When the service that manages the aggregation publishes a message, the payload of the message will be some representation of an aggregation (for example, JSON or Avro). Importantly, the service specifies the unique identifier of the aggregation as the partitioning key. This ensures that changes to any given aggregation entity will be published to the same partition.
What happens when something goes wrong?
While Kafka’s cross-border event publishing mechanism is elegant, it’s a distributed system, so there’s a lot of potential for errors. We’ll focus on perhaps the most common annoyance: that consumers may not be able to successfully process messages about their purchases.
The picture
What do we do now?
Make sure this is a problem
The first thing the team did wrong was not recognize that this was a potential problem at all. Message failures happen, and we need to develop a strategy for dealing with them… Be proactive, not reactive.
So understanding that this is a problem waiting to happen and designing targeted solutions is the first step. If we do that, we should congratulate ourselves a little. Now the big question remains: What do we do about this situation?
Can’t we just keep retrying that message?
By default, if the consumer does not successfully consume a message (that is, the consumer cannot commit the current offset), it will retry the same message. So, can’t we simply let this default behavior take over and retry the message until it succeeds?
The problem is that the message may never succeed. At the very least, it won’t succeed without some form of manual intervention. As a result, the consumer will never continue to process any subsequent messages, and our message processing will be stuck.
Okay, can’t we just skip that message?
We generally allow synchronous requests to fail. For example, a “create-user” POST to our UserAccount service might contain errors or missing data. In this case, we can simply return an error code (such as HTTP 400) and ask the caller to retry.
While this approach is not ideal, it does not cause any long-term problems with our data integrity. That POST represents a command, something that hasn’t happened yet. Even if we let it fail, our data will remain consistent.
This is not the case when we discard messages. A message represents an event that has occurred. Any consumer that ignores these events will be out of sync with the upstream service that generated the event.
All of which suggests that we don’t want to discard messages.
So how do we solve this problem?
This is not an easy problem for us to solve. So once we recognize that it needs a solution, we can consult the Internet for a solution. But that leads us to our second question: There’s some advice online that we probably shouldn’t follow.
Retry topics: Popular solutions
One of the most popular solutions you’ll find is the concept of Retry topics. The details vary from implementation to implementation, but the general concept is this:
- The consumer tries to consume a message from the main topic.
- If the message is not consumed correctly, the consumer publishes the message to the first retry topic, and then commits the offset of the message to proceed to the next message.
- Subscribes to the retry topic is the retry consumer, which contains the same logic as the primary consumer. The consumer introduces a short delay between message consumption attempts. If the consumer is also unable to consume the message, the message is published to another retry topic and the offset of the message is committed.
- This process continues with the addition of a number of retry topics and retry consumers, with increasing delays per retry (used as an fallback strategy). Finally, after the final retry consumer is unable to process a message, the message is published to a Dead Letter Queue (DLQ), where the engineering team will manually classify it.
Conceptually, the retry topic pattern defines multiple topics to which failed messages will be shunted. If the consumer of the main topic consumes a message that it cannot process, it releases itself to the next message by publishing the message to retry topic 1 and committing the current offset. The consumer of the retry topic will be a copy of the main consumer, but if it is unable to process the message, it will be published to a new retry topic. Finally, if the last retry consumer is also unable to process the message, it publishes the message to a dead-letter queue (DLQ).
What’s the problem?
This approach seems reasonable. In fact, it works in many use cases. The problem is that it does not serve as a universal solution. There are special use cases (such as our cross-border event publishing) for which this approach is actually dangerous.
It ignores different types of errors
The first problem is that it fails to take into account the two main causes of event consumption failures: recoverable errors and unrecoverable errors.
Recoverable errors are errors that will eventually be resolved if we try again and again. A simple example is a consumer who saves data to a database. If the database is temporarily unavailable, the consumer will fail when the next message comes through. Once the database becomes available again, the consumer is able to process the message again.
Another way to look at it: Recoverable errors are those that are rooted outside the message and the consumer. After fixing this error, our consumers will move on as if nothing had happened. (A lot of people get confused here. The term “recoverable” does not imply that the application itself — in our case, the consumer — can be recoverable. Rather, it means that some external resource — in this case, the database — fails and eventually recovers.)
The important thing to note about recoverable errors is that they will plague almost every message in the topic. Recall that all messages in a topic should follow the same schema and represent the same type of data. Again, our consumers will perform the same action for each event on the topic. Therefore, if message A fails due to A database interruption, then message B, message C, and so on will also fail.
An unrecoverable error is one that will fail no matter how many times we try again. For example, missing fields in a message might result in a NullPointerException, or fields containing special characters might make the message unparsed.
Unlike recoverable errors, unrecoverable errors typically affect a single isolated message. For example, if only message A contains special characters that cannot be parsed, message B will succeed, and so will message C, etc.
Unlike recoverable errors, resolving unrecoverable errors means that we must fix the consumer itself (never “fix” the messages themselves — they are immutable records!). For example, we might fix the consumer to properly handle null values, and then redeploy it.
So what does this have to do with the retry topic solution?
For starters, it’s not particularly useful for recoverable errors. Keep in mind that recoverable errors affect every message, not just the current one, until the external problem is resolved. So you can be sure that shunting failed messages to the retry topic clears the channel for the next message. But the next message will also fail, and the one after that and the one after that. We are better off letting consumers try again until the problem is resolved.
What about unrecoverable errors? Retry queues can help in these situations. If a troublesome message blocks the consumption of all subsequent messages, then there is no doubt that diverting that message will clear the way for our user consumption (of course, multiple retry topics are unnecessary).
But while retry queues can help message consumers plagued by unrecoverable errors move forward, they can also bring more pitfalls. Let’s take a closer look at why.
It ignores the sort
Let’s briefly review some of the important aspects of cross-border event publishing. After processing a command in a bounded context, we publish a corresponding event to a Kafka topic. Importantly, we specify the ID of the aggregation as the partitioning key.
Why is this important? What it ensures is that changes to any given aggregation are published to the same partition.
Okay, so why is this so important? When events are published to the same partition, it is guaranteed that the events are processed in the order in which they occurred. If successive changes are made to the same aggregation and the resulting events are published to different partitions, contention conditions can occur, where consumers consume the second change before consuming the first. This can lead to data inconsistencies.
Let’s take a simple example. Our User bounded context provides an application that allows users to change their names. One user changed his name from Zoey to Zoe, and then immediately changed it back to Zoiee. If we ignore sorting, a downstream consumer (such as the Login bounded context) might first process the change to Zoiee and then overwrite it with Zoe shortly thereafter.
Now, the login data is out of sync with our user data. To make matters worse, Zoiee sees “Welcome to Zoe!” every time she logs on to our site. Login prompt.
This is where the retry theme really goes wrong. They make it easy for our consumers to mess up the order of events. If a consumer is affected by some temporary database outage while processing a Zoe change, it will stream the message to a retry topic and try again later. If the database outage has been corrected by the time the Zoiee change arrives, the message will be successfully processed and then overwritten by the Zoe change.
To illustrate, a simple example like Zoiee/ Zoe is used here. In fact, processing events out of order can lead to all kinds of data corruption problems. Worse, these problems are rarely noticed in the first place. Instead, the data corruption they cause tends to go unnoticed for a while, but the extent of the damage increases over time. Generally, by the time we realize what’s going on, a lot of data has been affected.
When is the retry topic available?
To be clear, retry themes are not always the wrong pattern. Of course, there are some appropriate use cases. Specifically, this pattern works well when the consumer’s job is to collect immutable records. Examples might include:
- Consumers who process the flow of web activity to generate reports
- Consumers who add transactions to the ledger (as long as they are not tracked in a particular order)
- Consumers who are ETL data from another data source
Such consumers may benefit from the retry topic pattern without the risk of data corruption.
Be warned, though
Even with this use case, we should proceed with caution. Building such a solution is complex and time consuming. So, as an organization, we don’t want to write a new solution for every new consumer. Instead, create a unified solution, such as a library or container, that can be reused across various services.
There is another problem. We might build a solution to retry topics for interested consumers. Unfortunately, it won’t be long before this solution enters the realm of cross-border event publishing consumers. The team that owns these consumers may not be aware of the risks. As we discussed earlier, they may not be aware of any problems until significant data corruption occurs.
Therefore, before implementing the retry topic solution, we should be 100% sure:
- We’ll never have a consumer in our business to update existing data, or
- We have strict controls in place to ensure that our retry topic solution is not implemented with such consumers
How can we improve this model?
Given that the retry topic pattern may not be an acceptable solution for consumers of cross-border event publishing, can we tweak it to improve it?
At the outset, this article intended to provide a complete solution. But then I realized that there is no one-size-fits-all path. Therefore, we will discuss only some of the things that need to be considered in developing an appropriate solution.
Eliminating error types
Life would be a lot easier if we could disambiguate recoverable errors and unrecoverable errors. For example, if our consumers start experiencing recoverable errors, the retry topic becomes redundant.
Therefore, we can try to determine the type of error encountered:
void processMessage(KafkaMessage km) {
try {
Message m = km.getMessage();
transformAndSave(m);
} catch (Throwable t) {
if (isRecoverable(t)) {
// ...
} else {
// ...}}}Copy the code
In the Java pseudocode example above, isRecoverable() takes a whitelist approach to determine whether t represents a recoverable error. In other words, it checks t to see if it matches any known recoverable errors (such as SQL connection errors or ReST client timeouts) and returns true if it does, false otherwise. This prevents our consumers from being blocked by unrecoverable errors.
Admittedly, it can be difficult to disambiguate between recoverable and unrecoverable errors. For example, an SQLException might refer to a database failure (recoverable) or a constraint violation condition (unrecoverable). If in doubt, we should probably assume that the error is unrecoverable — the risk of doing this is to send other good messages to hidden topics, delaying their processing… But it also keeps us from inadvertently getting bogged down in endless attempts to deal with unrecoverable errors.
Retry within the consumer to recover the error
As we discussed, it makes no sense to publish a message to a retry topic when there are recoverable errors. We just clear the way for the next message to fail. Instead, the consumer can simply retry until the condition is restored.
Of course, a recoverable error means there is a problem with external resources. It doesn’t help that we keep sending requests for this resource. Therefore, we want to apply a fallback strategy to retry. Our pseudo-Java code might now look like this:
void processMessage(KafkaMessage km) {
try {
Message m = km.getMessage();
transformAndSave(m);
} catch (Throwable t) {
if (isRecoverable(t)) {
doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
} else {
// ...}}}Copy the code
(Note: Any fallback mechanism we use should be configured to alert us when a certain threshold is reached and notify us of potentially serious errors)
When an unrecoverable error is encountered, the message is sent directly to the last topic
On the other hand, when our consumer encounters an unrecoverable error, we might want to stash the message immediately to release subsequent messages. But would it be useful to use multiple retry topics here? The answer is no. Our message will only experience n consumption failures before going to DLQ. So why not paste the message there from the start?
Like the retry topic, this topic (in this case, we will call it a hidden topic) will have its own consumer, which is consistent with the main consumer. But just like DLQ, this consumer is not always consuming messages; It does so only when we clearly need it.
Considering the sorting
So let’s look at sorting. We reuse the previous user/Login example here. Login consumers may encounter an error when trying to process the E character in the Zoe name. The consumer recognizes it as an unrecoverable error, puts the message aside, and continues processing subsequent messages. After a while, the consumer will get the Zoiee message and process it successfully.
The Zoe message is hidden, and the Zoiee message has now been successfully processed. Currently, the data between the two bounded contexts is consistent.
At a later date, our team will fix the consumer so that it can properly handle the special character and redeploy it. We then republish the Zoe message to the consumer, who can now process it correctly.
When the updated consumer then processes the hidden Zoe message, the data between the two bounded contexts becomes inconsistent. Therefore, while the User bounded context treats the User as a Zoiee, the Login bounded context calls her Zoe.
Obviously, we’re not keeping it sorted; Zoe is processed by the Login consumer before Zoiee, but the correct order is reversed. After hiding one message, we could start hiding all the messages, but in that case we would actually get bogged down. Fortunately, we don’t need to keep all the messages in order, just consider the messages associated with a single aggregation. Therefore, if our consumer can track a particular aggregation that has been hidden, it can ensure that subsequent messages that are part of the same aggregation are also hidden.
After being alerted to messages in a hidden topic, we can undeploy the consumer and fix its code (note: Do not modify the message itself; Messages represent immutable events!) After fixing and testing our consumer, we can redeploy it. Of course, we will need to take special care to process all the records in the hidden theme before continuing with the main theme. This way, we will continue to have the correct sorting state. For this reason, we will deploy the hidden consumer first, and only when it is complete (which means all instances in the consumer group are complete, if we use multiple consumers) will we undeploy it and deploy the primary consumer.
We should also consider the fact that after a fixed consumer processes a hidden message, it may still encounter other errors. In this case, the error handling behavior should be as we described earlier:
- If the error is recoverable, retry using a fallback policy;
- If the error is unrecoverable, it hides the message and continues to the next message.
To do this, consider using a second hidden theme.
Is it acceptable to have some data inconsistencies?
Such systems can become quite complex to build. They can be difficult to build, test, and maintain. As a result, some organizations may want to identify the possibility of data inconsistencies and determine whether they can tolerate the risk.
In many cases, these organizations may adopt data coordination mechanisms to make their data eventually (and relatively long “eventually”) consistent. There are many strategies (beyond the scope of this article) for doing so.
conclusion
Handling retries seems complicated because it is — especially when compared to Kafka’s relatively elegant style when everything is going well. Any proper solution we build (whether it’s retry topics, hidden topics, or other solutions) is going to be more complex than we want.
Unfortunately, if we want elastic asynchronous communication flows between microservices, we can’t ignore it.
This article describes one popular solution, its disadvantages, and some considerations when designing an alternative solution. At the end of the day, to build the right solution, we should keep a few things in mind, such as:
- Learn about what Kafka provides through themes, partitions, and partitioning keys.
- Consider the difference between recoverable and unrecoverable errors.
- Java Middleware interview questions + study notes
- Usage of design patterns, such as bounded contexts and aggregations.
- Understand the use-case nature of our organization now and in the future. Are we just moving separate records? … In this case, we might not care about sorting; Or are we propagating events that represent data changes? … In this case, sequencing is critical.
- Consider carefully whether we are willing to tolerate any level of data inconsistency.