Naiad is a distributed system that executes cyclic parallel data streaming programs, providing high throughput batch processing, low latency streaming, iterative computing, and incremental computing.

The overview

Many data processing tasks require low latency interoperable access to the results, iterative subcomputations, and consistent intermediate output. The following figure illustrates these requirements: the application iterates over a live data stream and interactively queries the latest and consistent results on the results.

In order to meet the above requirements, the author designs a calculation model of sequential data flow:

  • Structured loops allow for feedback in the data flow
  • Stateful nodes generate and consume records without requiring global coordination
  • Notifies a node when it receives input for a specified turn or a record of a loop iteration

Sequential data flow

The sequential data flow is a directed acyclic graph, where each node is stateful and sends and receives time-stamped messages from the directed edge.

The graph structure

The sequential data flow diagram contains input nodes and output nodes for receiving input and giving output. The external message producer labels each message with an epoch and sends a “close” message when the data flow ends.

Sequential data streams support cyclic contexts, a cyclic context containing the entry node (I), exit node (E), and feedback node (F). To support the loop context, the timestamp of the message is designed as follows:


Where E is epoch,Is the counter corresponding to the k-layer loop, and the timestamp change in a loop is as follows

node Input timestamp Output timestamp
Enter the node
Exit node
Feedback node

The order of the timestamp is defined as phi for phiand.If and only ifand.

Node calculation

Each node needs to implement two callback functions:

v.OnRecv(e: Edge, m: Message, t: Timestamp)
v.OnNotify(t: Timestamp).
Copy the code

Two functions can be called within a callback function:

this.SendBy(e: Edge, m: Message, t: Timestamp)
this.NotifyAt(t: Timestamp)
Copy the code

All calls are queued,v.OnNotify(t)In allthev.OnRecv(e,m,t')Execute only after the execution is complete. Also, callv.OnNotify(t')andv.OnRecv(e,m,t')Parameter needs.

Here is an example program that takes input and outputs a unique element to output1 and a count to output2.

class DistinctCount<S,T> : Vertex<T>
{
    Dictionary<T, Dictionary<S,int>> counts;
    
    void OnRecv(Edge e, S msg, T time) {
        if(! counts.ContainsKey(time)) { counts[time] =new Dictionary<S,int> ();this.NotifyAt(time);
        }
        if(! counts[time].ContainsKey(msg)) { counts[time][msg] =0;
            this.SendBy(output1, msg, time);
        }
        counts[time][msg]++;
    }
    
    void OnNotify(T time) {
        foreach (var pair in counts[time])
            this.SendBy(output2, pair, time); counts.Remove(time); }}Copy the code

Implement sequential data flow

Sending notifications requires determining that no future messages with a given timestamp will appear. Timestamps bound to future messages and unprocessed events (messages and notifications), as well as the graph structure, allow a lower bound on the time of each message to be calculated based on the fact that messages cannot be delivered backwards in time.

Each event corresponds to a timestamp and a location that can be formed into a dot stamp


We say thatCan lead toIf and only if there is a path, the resulting dot stampmeet. If can find outtoShortest path and check whether it meets the requirementTo learn thatDoes it lead to.

The dispatcher maintains a collection of active dots, each corresponding to at least one unfinished event. Each dot stamp contains an occurrence count (including the number of times the dot was generated) and a lead count (the number of times the dot was generated). When the node generates and consumes events, the timestamp update mode is as follows:

operation Update rules
v.SendBy(e,m,t) OC[(t, e)] ← OC[(t, e)] + 1
v.OnRecv(e,m,t) OC[(t, e)] ← OC[(t, e)] − 1
v.NotifyAt(t) OC[(t, v)] ← OC[(t, v)] + 1
v.OnNotify(t) OC[(t, v)] ← OC[(t, v)] − 1

When the dot occurrence count becomes zero, the lead count of subsequent dot occurrences can be reduced, and when the lead count of dot occurrences is zero, notifications prior to this dot can be safely sent.

Distributed implementation

The data flow graph is distributed to different working nodes, and edges can use partitioning functions to pass messages to different nodes, or if there is no partitioning function the message is passed to the next node on the machine.

The work node is responsible for receiving and sending notifications for its part of the message. In order to trigger notifications correctly in a distributed environment, the global presence count is maintained through the presence count of broadcast dots.

Naiad uses a simple fault-tolerant approach: Each node implements the CHECKPOINT and RESTORE interfaces, which are called by the system to hold globally consistent checkpoints for failure recovery.

Because packet loss, garbage collection and other reasons can cause some work to slow down, the author uses a variety of methods to try to avoid this situation:

  • Network: There are short bursts of traffic between Naiad working nodes, and the authors make some optimizations for the TCP stack: for example, disable Nagle algorithm, reduce acknowledgement timeout, reduce retransmission time, and consider using RDMA to speed up communication.
  • Data race: Naiad reduces the granularity of wait time after a race is detected to reduce latency.
  • Garbage collection: Naiad use. Net implementation, uses various methods to minimize garbage collection, such as using buffer pools to reuse memory.

Write programs using Naiad

All Naiad programs have the following patterns: First, define a data flow diagram with input, calculation, and output phases; The data is then sent to the input phase. For example, a typical MapReduce program looks like this:

// 1a. Define input stages for the dataflow.
var input = controller.NewInput<string> ();// 1b. Define the timely dataflow graph.
// Here, we use LINQ to implement MapReduce.
var result = input.SelectMany(y => map(y))
                  .GroupBy(y => key(y), 
                        (k, vs) => reduce(k, vs));

// 1c. Define output callbacks for each epoch
result.Subscribe(result => { ... });

// 2. Supply input data to the query.
input.OnNext(/* 1st epoch data */);
input.OnNext(/* 2nd epoch data */);
input.OnNext(/* 3rd epoch data */);
input.OnCompleted();
Copy the code

The authors package some high-level programming models into libraries for developers to use, hoping that most of the application scenarios can be implemented using libraries based on the graph building interface provided by Naiad.

reference

  1. Murray, Derek G., et al. “Naiad: a timely dataflow system.” Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. ACM, 2013.