This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
Guaranteeing Message Processing
Storm provides several different levels of message assurance, including: With Trident, exactly once means that the final processing result is exactly once, not that the input data is processed exactly once. In counting, for example, exactly once means that the final result written is exactly the same as the data entered, no more than one, no less than one. This article mainly describes how Storm ensures that messages are at least once.
What is message integrity handling?
A single tuple can create thousands of tuples on top of it. Consider the situation where there is a word count:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
Copy the code
The Topology reads the sentence from a Kestrel queue, breaks the sentence into words, and then sends out each word and the number of words. In this case, the tuple emitted from spout produces many new tuples based on it: tuples of the words in the sentence and tuples of the number of words per word. These messages form such a tree, as shown below:
When the tuple tree is sent and every message in the tree is processed, Storm considers that the tuple has been processed. Accordingly, if a message in the tuple tree is not processed within the specified timeout period, it means that the tuple failed. This timeout can be configured when the topology is built using the config.topology_message_timeout_secs parameter. The default value is 30s.
What happens when a message gets integrity processing or fails processing?
To understand this, let’s look at the life cycle of a tuple. The next step is to define the interface to spout (see Javadoc for more details) :
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
Copy the code
First, Storm asks Spout for a tuple by calling spout’s nextTuple method. Spout sends a tuple to one of its output streams using the SpoutOutputCollector provided in the open method. When a tuple is sent, Spout provides a “message ID” that will be used to identify the tuple later. For example, the KestrelSpout above reads a message from a Kestrel queue and then sends a message with a “message ID” provided by Kestrel. The general format for sending messages using SpoutOutputCollector is as follows:
_collector.emit(new Values("field1", "field2", 3) , msgId);
Copy the code
Next, the Tuple sends into the consumption bolts and Storm tracks the message tree it creates. If Storm detects that a tuple has been processed in its entirety, Storm calls the ACK method of the Spout that originally sent the tuple based on the “message ID” provided by Spout. Instead, if a tuple times out, Storm calls Spout’s fail method. Note: Ack or fail for a particular tuple response will only be executed by the Spout task that originally created the tuple. So if an Spout has many tasks in the cluster, but for a particular tuple, it will only respond to its success or failure by the task that created it, and not by other tasks.
Let’s use KestrlSpout as an example to see what Spout needs to do to ensure reliable processing of messages. When a KestrelSpout receives a message from the Kestrel queue, it can be said to “open” the message. That is, the message is not taken out of the queue, but is in a “pending” state waiting for confirmation that the message is complete. When in the “pending” state, messages are not sent to other consumers of the queue. In addition, if the client disconnects, messages in the pending state are put back into the queue. When a message is “opened,” Kestrel gives the client both the data of the message body and a unique ID. KestrelSpout treats this unique ID as a “message ID” when sending a tuple using a SpoutOutputCollector. After a period of time, when KestrelSpout’s ACK or FAIL methods are called, KestrelSpout requests Kestrel with this message ID to either remove the message from the queue (ACK) or put it back on the queue (FAIL).
What is Storm’s reliability API?
As a user, to take advantage of Storm reliability, you need to do two things:
- Storm needs to be notified when a tuple is created (creating a new node in the message tree).
- When you finish processing a tuple, you need to notify Storm.
With these two actions, Storm can detect when the tuple tree is complete and call the ACK and fail methods when appropriate. The Storm API provides a concise way to accomplish both of these tasks.
The operation of specifying a connection (adding a child node) to a tuple tree is called anchoring. Anchoring is also done synchronously when you launch a tuple. Take the following bolt, which splits the sentence tuple into the word tuple:
public class SplitSentence extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple, new Values(word)); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}Copy the code
Each word tuple is anchored by specifying the input tuple as the first argument to the emit method. When the word tuple is anchored, if the word tuple fails in subsequent processing, the original Spout tuple that is the root of the tuple tree is reprocessed. Instead, let’s look at what happens if the input tuple is not specified on emit, as follows:
_collector.emit(new Values(word));
Copy the code
The firing of the word tuple is thus called unanchored. If the tuple fails to be processed downstream, the root tuple will not be resend. Sometimes it is necessary to send such an “unanchored” tuple, depending on the fault-tolerance requirements of your topology.
An output tuple can be anchored to multiple input tuples. This is useful for streaming join or aggregate operations. Failure to handle multiple anchored tuples will result in retransmission of multiple corresponding tuples. The multi-anchor operation is implemented by specifying a list of tuples instead of a single tuple. Such as:
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
Copy the code
The multiple anchor operation adds the output tuple to multiple tuple trees. Note that multiple anchoring may also break the tree structure to form a directed acyclic graph (DAG) of a tuple, as shown below:
Tuple A triggers tuple B and tuple C, which together produce tuple C. (For example, “Hello world” is split into “hello”, “world” words, and then “Hello world!” ). Storm supports both trees and DAGs (earlier versions of Storm only work with trees)
Anchoring specifies the structure of the tuple tree — next, when a tuple tree has been processed, Storm’s reliability API confirms the tuple through the Ack and fail methods of the OutputCollector. For example, in the SplitSentence example above, you’ll see that the input tuple is ack after all the words tuple have been sent.
You can use the Fail method of OutputCollector to cause the Spout tuple at the root of the tuple node to fail immediately. For example, your application might catch an exception from a database client and tell Storm to enter a tuple. The input tuple can be processed faster without waiting for a timeout.
Each pending tuple must respond explicitly (ACK or fail). Since Storm uses memory to track every tuple, if you don’t answer every tuple, the task will quickly run out of memory.
A general pattern of Bolts handling tuples is to read the input tuple in the execute method, send a new tuple based on the input tuple, and reply to the tuple at the end of the method. Most of these Bolts fall into the category of filters or simple handlers. Storm Z has a simple interface that encapsulates this mode, BasicBolt. For example, if BasicBolt is used, an example of SplitSentence might read:
public class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}Copy the code
This implementation is simpler than the previous one and semantically identical. Tuples sent to the BasicOutputCollector are automatically anchored to the input tuple, and the input tuple is automatically answered at the end of the execute method.
Accordingly, a Bolt that performs an aggregate or join operation might need to defer answering tuples because it waits for a batch of tuples to complete some result calculation. Aggregate and join operations also typically require multiple anchoring of their output tuple. These are all beyond the scope of IBasicBolt’s simple model.
How do I make my application run correctly if tuples can be reissued?
In software design, the answer to this question is always “it depends.” If you really need exactly once semantics, you can use the Trident API. In some cases, such as in mass analysis, where it is possible to discard data, fault tolerance can be disabled by setting the number of acker bolts to 0 config.topology_ackers. But in some cases, you want to make sure everything is processed at least once and that no data is lost.
– How does Storm effectively implement reliability?
A Storm topology has a special set of “acker” tasks that track the DAG of each tuple launched by Spout. When acker sees a DAG complete, it confirms the message by sending a message to the SPout task that created the tuple. You can set the number of “acker” tasks for the topology by configuring the config. TOPOLOGY_ACKERS parameter. Storm defaults to assigning each worker an “acker” task for TOPOLOGY_ACKERS.
The best way to understand Storm’s reliability implementation is by understanding the lifecycle of tuple and Tuple DAG. When a tuple is created in the topology, either in Spout or Bolt, the tuple is configured with a random 64-bit ID. Acker uses these ids to track the tuple DAG of each spout tuple.
Spout tuple every tuple in the tuple tree knows the ROOT tuple’S ID. When you send a new tuple in a bolt, the ids of all root tuples entered in the tuple are copied to the new tuple. When a tuple is ack, it sends a message to the appropriate acker via the fallback function that shows what has changed in the tuple tree. That is, it tells the acker that “in this tuple tree, my processing is done, and the next tuple I anchor is the new tuple.”
For example, if D tuple and E tuple are created by C tuple, then the tuple tree changes as follows when C answers:
Since C is removed from the tuple tree at the same time D and E are added to the tree, the tree does not end prematurely.
There are a few details about how Storm tracks the Tuple tree. As mentioned above, in a topology, you can have any acker task. This leads to the question, when a tuple is ack in the topology, how does it know which acker task to send that information to?
Storm uses hashing (mod) to match spout tuples to acker tasks. Because each tuple has the IDS of all root tuples in its tree, they know which acker task they need to communicate with.
Another detail is how acker knows which Spout tuple it is tracking is being processed by which Spout task. In fact, when the Spout task sends a new tuple, it also sends a message to the corresponding acker telling the acker that the Spout tuple is associated with its task ID. Later, when acker observes the tuple tree ending processing, it knows which Spout task to send the ending message to.
Acker does not actually trace the tuple tree directly. For a tree with tens of thousands of tuple nodes or more, keeping track of all the tuple trees in ackers might run out of memory. So, acker uses a special strategy for tracing that takes up a fixed amount of memory (about 20 bytes) per spout tuple. This tracking algorithm is the key to Storm’s operation and a breakthrough technology for Storm.
An acker task stores a map of spout tuple ids mapped to a set of values. The first value is the task ID that created the spout tuple, which will then be used to send a completion message. The second value is a 64-bit number called an ack val. The response value is a complete state representation of the entire tuple tree, and it is independent of the tree size. Because this value is simply the result of xOR on the tuple ids of all created or answered tuples in the tree.
An acker task knows that the tuple tree has been processed when it observes that the “reply value” is zero. Because the tuple ID is actually a randomly generated 64-bit value, it is extremely unlikely that the “reply value” happens to be zero. Theoretically, with 10,000 responses per second, it would take 50 million years for an error to occur. And even then, data loss occurs only if the tuple fails to process in the topology.
Now that you have some understanding of the reliability algorithm, let’s take a look at all the failure scenarios and see how Storm avoided missing data in those scenarios:
- Tuple is not answered because the task thread is down: in this case, the spout tuple at the root of the tuple will be resend if the task times out.
- Acker task down: In this case, all spout tuples tracked by this Acker will be resent out of time.
- Spout task down: In this case, the source of the Spout task is responsible for reprocessing the message. For example, message queues such as Kestrel and RabbitMQ put all pending messages back into the queue when the client disconnects.
In summary, Storm’s reliability mechanism is completely distributed, scalable and fault tolerant.
Tuning reliability
Because acker tasks are lightweight, you don’t need many acker tasks in a topology. You can monitor their performance via the Storm UI (the acker task id is “__acker”). If you find problems with your observations, you may need to add more acker tasks.
If message reliability is not that important to you — that is, you don’t care about tuple loss in the event of a failure — then you can improve the performance of the topology by not tracking tuple tree processing. Since each tuple in the tuple tree has a reply message, not tracking the tuple tree cuts the number of messages transmitted in half. At the same time, the ids in downstream data flows are reduced, reducing network bandwidth consumption.
There are three ways to remove Storm reliability:
- The first way is to take
Config.TOPOLOGY_ACKERS
Set to 0, in which case Storm will be called immediately after Spout sends the tupleack
Method, the tuple tree will not be traced. - The second approach is to remove reliability based on the message itself. You can use the
SpoutOutputCollector.emit
Method omits the message ID to turn off tracing for the spout tuple. - Finally, if you don’t care if the downstream tuple in the topology fails, you can choose to send the “unanchored” tuple when sending the tuple. Since these tuples are not anchored to any spout tuple, they do not cause any spout tuples to be reprocessed if they fail.
The illustration
In general (as shown below), spout sends the msgId to the acker when it sends the MSG, Bolt sends the msgId to the acker after it processes the tuplt, and the acker xOR the received msgId. If the value is 0, the tuple is processed. If any error does not result in the MSG being delivered correctly to the leaf, then the acker will not receive an ID that causes the XOR to be 0, and eventually spout will timeout and resend the MSG.
There is a nice blog post that gives a detailed illustration of Storm’s Tuple validation problem, which can be seen using the magic XOR to solve Storm’s Tuple validation problem.
The blog only for beginners self learning record, shallow words, if there is wrong, kindly correct. If you don’t understand, welcome to comment, learn together!
The resources
Storm Document -> Guaranteeing Message Processing
Fix Storm Tuple verification with the magic XOR