0 x00 the

In this series, we will focus on the modules that should be considered when designing distributed delay queues, intercut with the implementation methods of some message queues, analyze the source code of dyno-queues to see the details of designing and implementing a distributed delay queue.

0x01 Dyno-queues Distributed Delay queues

Dyno-queues are queues implemented by Netflix based on Dynomite and Redis.

Dynomite is a generic implementation that can be used with many different key-value storage engines. It currently provides support for Redis serialization protocol (RESP) and Memcached write protocol.

1.1 Design Objectives

Specific design objectives vary according to different business systems.

The business background of Dyno-Queues is that many business processes are running on Netflix platform. The tasks of these processes are driven by asynchronous orchestration. Now, a distributed delay queue is implemented with the following characteristics:

  • Distributed;
  • No external locking mechanism;
  • High concurrency.
  • At least one semantic delivery;
  • Not following a strict FIFO;
  • Deferred queuing (messages will not be fetched from the queue until some future time);
  • The priority;

1.2 Selection Ideas

Netflix chose Dynomite because:

  • It features performance, multi-data center replication, and high availability;
  • Dynomite provides sharding and plugable data storage engines, allowing for increased vertical and horizontal scaling on data needs;

Netflix chose Redis as a storage engine to build queues because:

  • The Redis architecture supports queue design well by providing the data structure needed to build queues, while Redis also provides excellent performance with low latency.
  • Dynomite provides high availability, peer-to-peer replication, and consistency features on top of Redis for building distributed cluster queues.

0x02 Overall design

2.1 System Assumptions

Query model: Based on key-value model rather than SQL, i.e. relational model. The storage object is small.

ACID properties: Traditional relational databases that guarantee transactions with ACID (A atomicity, C consistency, I isolation, D persistence) tend to have poor availability. Dynamo uses weak consistency C to achieve high availability, does not provide data isolation I, and only allows single-key updates.

2.2 high availability

In fact, all high availability can be achieved by relying on RPC and high availability of storage.

  • First, let’s take a look at the high availability of RPC, such as MEituan’s MTThrift based RPC framework and Ali’s Dubbo, which have automatic service discovery and load balancing functions.
  • The high availability of message queues, by ensuring that the broker’s interface to receive and acknowledge messages is idempotent, and that several consumer machines process messages are idempotent, hands off the availability of message queues to the RPC framework.

Netflix chose Dynomite because:

  • It features high performance, multi-data center replication and high availability;
  • Dynomite provides sharding and plugable data storage engines, allowing for increased vertical and horizontal scaling on data needs;

So the high availability of Dyno-queues is automatically solved.

2.3 power etc.

How do you guarantee idempotence? The easiest way to do this is to share storage. Broker multiple machines sharing a DB or a distributed file/KV system processing messages is naturally idempotent. Even if there is a single point of failure, the other nodes can be taken over immediately.

For queues that do not share storage, such as Kafka using partitioned plus active/standby mode, it is a little more troublesome. Ensure high availability in each zone. That is, each zone must have at least one active/standby partition and synchronize data.

Dynomite uses the redis cluster as a shared storage for idempotent guarantees.

2.4 Carrying message accumulation

Once a message reaches the server, the broker loses its meaning if it reaches the receiver without any processing. In order to meet our peak/flow control/final reachable needs, it would be logical to store messages and then choose when to deliver them.

This storage can be made in many ways. For example, store it in memory, store it in distributed KV, store it on disk, store it in database, etc. But when it comes down to it, there are two main types: persistent and nonpersistent.

The persistent form ensures a greater degree of message reliability (such as power outages) and can theoretically carry a greater amount of message accumulation (external storage is much larger than memory).

But not every message needs persistent storage. Many messages require delivery performance more than reliability, and there are a large number of messages (e.g., logs). At this time, the message is directly stored in memory, try several failover, and finally delivered.

Dynomite mitigated the message heap problem to some extent by using redis clusters as shared storage.

2.5 Storage Subsystem

Let’s take a look at the options for various storage subsystems if data landing is required. Theoretically, in terms of speed, file system > Distributed KV > Distributed file system > database, while reliability is the opposite. Again, make the most logical choice based on the supported business scenario.

DB is the best choice if your message queue is used to support payments/transactions, etc., and has high reliability requirements, but not so high performance and volume requirements, and you don’t have the time and energy to devote to file storage system research.

However, DB is limited by IOPS, and file-based storage is a good solution if 5-digit QPS performance is required for a single broker. On the whole, it can be processed by data file + index file.

Distributed KV (such as MongoDB, HBase), or persistent Redis, due to its friendly programming interface and considerable performance, is also a good choice in scenarios with low reliability requirements.

Since the scenario is less reliable, Dynomite can use the Redis cluster storage subsystem as well.

2.6 Analysis of consumption relationship

The next important thing is to resolve the send/receive relationship and deliver the message correctly. Regardless of the phenomenon, the sender and receiver relationship is nothing more than the difference between unicast and broadcast. The so-called unicast is point-to-point; Broadcasting, on the other hand, is point-to-multipoint.

A common design is to support inter-group broadcasting, with different groups registering for different subscriptions. If different machines in a group register the same ID, unicast; If different ids (such as IP address + port) are registered, broadcast.

As for the maintenance of broadcast relationship, message queues are generally maintained on common storage, such as Config Server and ZooKeeper, because they are clusters. The things to do to maintain broadcasting relations are basically the same:

  • Maintain sending relationships.
  • Send notifications of relationship changes.

This article describes how to maintain send relationships later on.

2.7 Data Sharding

The logic for data sharding can be implemented either on the client side or in the Proxy layer, depending on how your architecture is designed.

The traditional database middleware implements the sharding logic in the client and accesses different MySQL libraries by rewriting the physical SQL. However, in the computing and storage separation architecture advocated by NewSQL database, sharding logic is usually implemented in the computing layer, namely the Proxy layer, which forwards user requests to the correct storage nodes through stateless computing nodes.

In Dynomite, queues are sharded according to available areas. When data is pushed to queues, sharding is determined through the rotation training mechanism, which can ensure that the data of all sharding is balanced and each sharding represents the ordered set in Redis. The key in an ordered set is a combination of queueName and AVAILABILITY _ZONE.

public class RoundRobinStrategy implements ShardingStrategy {

    private final AtomicInteger nextShardIndex = new AtomicInteger(0);

    /**
     * Get shard based on round robin strategy.
     * @param allShards
     */
    @Override
    public String getNextShard(List<String> allShards, Message message) {
        int index = nextShardIndex.incrementAndGet();
        if (index >= allShards.size()) {
            nextShardIndex.set(0);
            index = 0;
        }
        returnallShards.get(index); }}Copy the code

0 x03 Dynomite features

3.1 Availability Zones and Racks

Dyno-queues are built on top of Dynomite’s JAVA client Dyno, which provides connection pooling for persistent connections and can be configured for topological awareness. For details about Dyno, please refer to the above:

Dynomite Distributed storage Engine DynoJedisClient(1)

Dynomite Distributed storage Engine DynoJedisClient(2)

3.1.1 rack

Dyno provides specific local racks for applications (in AWS, a rack is a region, for example US-East-1A, US-East-1B, etc.), and clients of US-East-1A will connect to Dynomite/Redis nodes in the same region unless the node is unavailable, In this case the client will fail over. This property is used to partition queues by region.

3.1.2 shard

The queue fragments data according to available areas. When data is pushed to the queue, the fragmentation is determined by the rotation training mechanism. This mechanism ensures that data in all fragments is balanced and each fragment represents an ordered set in Redis.

The specific mechanism is as follows:

public class RoundRobinStrategy implements ShardingStrategy {

    private final AtomicInteger nextShardIndex = new AtomicInteger(0);

    /**
     * Get shard based on round robin strategy.
     * @param allShards
     */
    @Override
    public String getNextShard(List<String> allShards, Message message) {
        int index = nextShardIndex.incrementAndGet();
        if (index >= allShards.size()) {
            nextShardIndex.set(0);
            index = 0;
        }
        returnallShards.get(index); }}Copy the code

3.2 the Quorum

There is a CAP theory in distributed systems that is practically unavoidable for P (partition tolerance). Because the processing in the distributed system is not in the local machine, but many machines in the network communicate with each other, so the network partition and network communication failure cannot be avoided. Therefore, we can only try to find A balance between C and A.

For data stores, copy backup is used to improve Availability. For example, in HDFS, three copies of each data are stored by default. The machine where a data block resides is down, and the machine where the copy of the data block resides is read (as can be seen from this, the data distribution is distributed in units of “data block”).

However, the problem is that when data needs to be modified, all copies of the data need to be updated to ensure Consistency of the data. Therefore, there is A trade-off between Consistency (C) and Availability (A).

The Quorum mechanism is such a trade-off, a model of read-write translation.

3.2.1 Data Consistency

  • Strong consistency: The value fetched from any different copy is the same at any time.
  • Weak consistency: Sometimes refers to final consistency. It means that at any time, due to network delay or device exceptions, values in different copies may be different, but eventually become the same after a period of time.

Obviously, we want to achieve strong consistency, so what are the ways to achieve this? The simplest and most straightforward is WARO, which stands for Write All Read One.

3.2.1.1 WARO agreement

WARO is a simple copy control protocol. When a Client requests to write data to a copy (update data), the write operation succeeds only when all copies are updated successfully. Otherwise, the write operation fails. In this case, you only need to read the data on any copy. The impact of WARO, however, is that the write service is less available, because if a copy update fails, the write operation is considered to have failed.

3.2.1.2 Quorum mechanism

Quorum is defined as follows: Assuming that there are N copies and the update operation WI succeeds in updating W copies, the update operation WI is regarded as a success, and the data corresponding to the successful update operation is called “successfully committed data”. For a read operation, at least R copies must be read. W+R>N indicates that W and R overlap. Generally, W+R=N+1.

  • N = Number of data copies stored;
  • W = copies needed to update successfully;
  • R = number of copies to be accessed at a time;

The Quorum mechanism considers a write to be successful when the number of machines writing to it reaches a majority (W). That is, the Quorum mechanism can eliminate the need to update all the data while ensuring that valid data is returned to the user.

3.2.2 ES quorum

Let’s take ES for example.

3.2.2.1 Write Consistency

We can send any add, delete or modify operation with a consistency parameter indicating what consistency we want.

  • One: Write operations can be performed as long as the Primay shard is active.
  • All: The write operation can be performed only when all shards and replicas are active.
  • Quorum (default) : A majority of all shards must be available (half or more) for execution;
3.2.2.2 quorum mechanism

quorum = int((primary shard+number_of_replicas)/2)+1

If there are fewer nodes than quorum, querum may not be complete, which in turn makes it impossible to perform any writes. When quorum is incomplete, it waits. The default wait time is 1 minute, expect the number of active shards to increase, and eventually, timeout.

3.3 DC_QUORUM

3.3.1 configuration

Dynomite can expand eventual consistency to Tunable consistency.

Dynomite has the following configuration for QUORUM:

  • DC_ONE reads and writes to and requests to this node, and other racks write asynchronously. In DC_ONE mode, read and write behaviors are synchronized in the Local Availability Zone(AZ).
  • DC_QUORUM writes synchronously to a specified number of racks, and the other nodes write asynchronously. With DC_QUORUM, operations under a specific number of nodes in a local region are synchronized.
  • DC_SAFE_QUORUM is similar to DC_QUORUM, except that the request is successful only if it has been successfully read and written on a specified number of racks and the data check is synchronized. Otherwise, an error will be reported.

According to the test results, Dynomite can expand from 3,6,12,24 all the way to 48 nodes. Under DC_ONE and DC_QUORUM modes, throughput rate can increase linearly. Dynomite, meanwhile, adds very little overhead for latency, even in DC_QUORUM mode, just a few milliseconds. DC_QUORUM is at a disadvantage in terms of latency and throughput, but provides better read and write guarantees for customers.

3.3.2 rainfall distribution on 10-12 implementation

For Dyno-Queues, it’s in implementation. In RedisQueues, for example, have the following member variables:

private final JedisCommands quorumConn;

private final JedisCommands nonQuorumConn;
Copy the code

When building RedisQueues, you need to specify which one to use.

And we know from the notes,

  • @param quorumConnDyno connection with dc_quorum enabled, that is, Redis with Quorum;
  • @param nonQuorumConnDyno connection to local Redis;

The code to generate RedisQueues is as follows (note the comments) :

/ * * *@param quorumConn Dyno connection with dc_quorum enabled
 * @param nonQuorumConn    Dyno connection to local Redis
 */
public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) {
    
    this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy);
    
}
Copy the code

3.3.3 use

When there is a sharding, it is extracted from nonQuorumConn (local Redis).

The reason for using nonQuorumConn is eventual consistency.

Because Replication lags, data from different shards may be different at a certain point in time, so prefetch is required. This requires nonQuorumConn to prefetch, since the data is correct locally in Redis.

private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
    return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}
Copy the code

For example, when processing a message without an ACK, the message ID is read from nonQuorumConn and the message content is read from quorumConn.

This is due to consistency, so here it is:

@Override
public void processUnacks(a) {
        execute("processUnacks", keyName, () -> {
            
            Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);
            
            for (Tuple unack : unacks) {
                double score = unack.getScore();
                String member = unack.getElement();
                String payload = quorumConn.hget(messageStoreKey, member);
                longadded_back = quorumConn.zadd(localQueueShard, score, member); }}); }Copy the code

Another example is the nonQuorumConn used to extract messages locally.

@Override
public Message localGet(String messageId) {
    try {

        return execute("localGet", messageStoreKey, () -> {
            String json = nonQuorumConn.hget(messageStoreKey, messageId);

            Message msg = om.readValue(json, Message.class);
            returnmsg; }); }}Copy the code

For example, popWithMsgIdHelper reads nonQuorumConn first and then reads from quorumConn.

public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {

    try {
        return execute("popWithMsgId", targetShard, () -> {

            String queueShardName = getQueueShardKey(queueName, targetShard);
            double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
            String unackShardName = getUnackKey(queueName, targetShard);

            ZAddParams zParams = ZAddParams.zAddParams().nx();

            Long exists = nonQuorumConn.zrank(queueShardName, messageId);
            // If we get back a null type, then the element doesn't exist.
            if (exists == null) {
                // We only have a 'warnIfNotExists' check for this call since not all messages are present in
                // all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0',
                // we may have hit an inconsistency (because it's in the queue, but other calls have failed),
                // so make sure to log those.
                monitor.misses.increment();
                return null;
            }

            String json = quorumConn.hget(messageStoreKey, messageId);
            if (json == null) {
                monitor.misses.increment();
                return null;
            }

            long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
            if (added == 0) {
                monitor.misses.increment();
                return null;
            }

            long removed = quorumConn.zrem(queueShardName, messageId);
            if (removed == 0) {
                monitor.misses.increment();
                return null;
            }

            Message msg = om.readValue(json, Message.class);
            returnmsg; }); }}Copy the code

0x04 Outer Encapsulation

RedisQueues are external interfaces for users whose internal mechanisms, such as policies, can be seen from their member variables.

public class RedisQueues implements Closeable {

    private final Clock clock;

    private final JedisCommands quorumConn;

    private final JedisCommands nonQuorumConn;

    private final Set<String> allShards;

    private final String shardName;

    private final String redisKeyPrefix;

    private final int unackTime;

    private final int unackHandlerIntervalInMS;

    private final ConcurrentHashMap<String, DynoQueue> queues;

    private final ShardingStrategy shardingStrategy;

    private final boolean singleRingTopology;
}

Copy the code

Queue V1Queue = queues. Get (“simpleQueue”).

public DynoQueue get(String queueName) {

    String key = queueName.intern();

    return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
            .withUnackTime(unackTime)
            .withNonQuorumConn(nonQuorumConn)
            .withQuorumConn(quorumConn));
}
Copy the code

0x05 Data structure

Let’s look at several data structures in Dyno-queues.

5.1 Message Structure

A complete message queue should define the types of messages it can deliver, such as transactional messages, local non-persistent messages, and non-reliable messages that do not land on the server. Make different choices for different business scenarios.

Queues Are only reliable messages about server landing. Each delayed message must contain the following parameters:

  • Id: unique identifier;
  • Payload: The body of mq is sent to the consumer for specific message processing after the message expires.
  • Timeout: indicates the delay time.
  • Priority: Priority, which, together with timeout, determines how messages are published, that is, which messages of the same timeout period are used first.
  • Shard: partition;
public class Message {
    private String id;
    private String payload;
    private long timeout;
    private int priority;
    private String shard;
}
Copy the code

5.2 Storage Structure

Hashes record message contents, zset implements queues sorted by expiration time, i.e.

  • Use hash to record message content.
    • Use hset to store messages;
    • Extract messages using hGET;
  • Zset in Redis is used to realize a delay queue, mainly using its score attribute, Redis uses score to sort the members of the set from small to large;
    • Run the zadd key score1 value1 command to produce messages.
    • Consuming messages using ZREM;

The specific logic is shown in the figure, where the dashed line refers to the logical management of the two through MSG ID, and there is no physical correlation:

        +----------+----------+----------+-----+----------+
        |          |          |          |     |          |
 zset   | msg id 1 | msg id 2 | msg id 3 | ... | msg id n |
        |          |          |          |     |          |
        +---+------+----+-----+----+-----+-----+----+-----+
            |           |          |                |
            |           |          |                |
            v           v          v                v
        +---+---+   +---+---+   +--+----+        +--+--+
hash    | msg 1 |   | msg 2 |   | msg 3 |        |msg n|
        +-------+   +-------+   +-------+        +-----+
Copy the code

In terms of code, it is:

  • The id of the Message is the key, and the whole Message is packaged as a JSON String as the value:quorumConn.hset(messageStoreKey, message.getId(), json);
  • Construct zset score from Message timeout, priority, and current timestamp:double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;

See the following for details:

for (Message message : messages) {
    String json = om.writeValueAsString(message);
    quorumConn.hset(messageStoreKey, message.getId(), json);
    double priority = message.getPriority() / 100.0;
    double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
    String shard = shardingStrategy.getNextShard(allShards, message);
    String queueShard = getQueueShardKey(queueName, shard);
    quorumConn.zadd(queueShard, score, message.getId());
}
Copy the code

0 x06 queue

RedisDynoQueue is the main implementation of Dyno-queues.

6.1 Redis related

From a Redis perspective, three sets of Redis data structures are maintained for each queue:

  • Zset, an ordered set containing queue elements and scores;
  • Hash collection containing the message content, where key is the message ID;
  • Contains an ordered set of messages that the client has consumed but not yet acknowledged, the UN-ack set zset;

These three groups of Redis data structures actually have no corresponding member variables inside the RedisDynoQueue. To the RedisDynoQueue, they seem to be logical concepts, but in fact they exist in the internal storage of Redis, and Dynomite is responsible for high availability, etc.

Details are as follows:

                  message list



zset  +----------+----------+----------+-----+----------+
      |          |          |          |     |          |
      | msg id 1 | msg id 2 | msg id 3|... | msg id9 |
      |          |          |          |     |          |
      +---+------+----+-----+----+-----+-----+----+-----+
          |           |          |                |
          |           |          |                |
          v           v          v                v
hash  +---+---+   +---+---+   +--+----+        +--+--+
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|
      +-------+   +-------+   +-------+        +-----+



                  unack list
       +------------+-------------+--------------+
zset   |            |             |              |
       |  msg id 11 |   msg id 12 |   msg id 13  |
       |            |             |              |
       +------------+-------------+--------------+

Copy the code

6.2 Member Variables

The member variables of RedisDynoQueue can be classified as follows:

6.2.1 general

  • String queueName: the Queue name;
  • String shardName: partition name;

6.2.2 Redis connection related

  • JedisCommands quorumConn: quorum connection;
  • JedisCommands nonQuorumConn: non-quorum connection;

6.2.3 Related to Redis operation

  • ObjectMapper om: used to serialize messages to Redis;

  • Clock Clock: used to generate a timestamp for a score;

  • String redisKeyPrefix: Each queue defines its own key;

  • String messageStoreKey: For each Redis hash, you can set your own field, for example:

    this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
    
    quorumConn.hget(messageStoreKey, messageId)
    Copy the code
  • List allShards: all partitions;

  • String localQueueShard: local partition;

  • ShardingStrategy ShardingStrategy: Zoning strategy.

  • ConcurrentLinkedQueue prefetchedIds: Prefetch message IDs from the local shard; Local partition first message;

  • Map<String, ConcurrentLinkedQueue> unsafePrefetchedIdsAllShardsMap;

    this.unsafePrefetchedIdsAllShardsMap = new HashMap<>();
    
    for (String shard : allShards) {
        unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>());
    }
    Copy the code
  • Int retryCount = 2: retries;

6.2.4 Ack associated

  • Int unackTime = 60: The score used to generate the ACK queue.

    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
    
    long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
    Copy the code
  • ScheduledExecutorService schedulerForUnacksProcessing: used to generate the thread to ack on a regular basis

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

if (this.singleRingTopology) {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}
Copy the code
  • Boolean singleRingTopology:

6.2.5 Monitoring and Statistics

QueueMonitor monitor: Monitoring and statistics;

6.2.6 Specific definitions

The specific code is as follows:

public class RedisDynoQueue implements DynoQueue {
    private final Clock clock;

    private final String queueName;

    private final List<String> allShards;

    private final String shardName;

    private final String redisKeyPrefix;

    private final String messageStoreKey;

    private final String localQueueShard;

    private volatile int unackTime = 60;

    private final QueueMonitor monitor;

    private final ObjectMapper om;

    private volatile JedisCommands quorumConn;

    private volatile JedisCommands nonQuorumConn;

    private final ConcurrentLinkedQueue<String> prefetchedIds;

    private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;

    private final ScheduledExecutorService schedulerForUnacksProcessing;

    private final int retryCount = 2;

    private final ShardingStrategy shardingStrategy;

    private final boolean singleRingTopology;
}
Copy the code

So far, the basic function of Dyno-queues has been preliminary. we will continue to introduce message generation and consumption next time.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Dry goods share | how to design from scratch from a message queue

Message queue understanding, several common message queue comparison, novice can also see understand! —- Distributed middleware message queue

Message queue design essentials

Like delay queue design

Distributed delay queue based on Dynomite

Blog.mikebabineau.com/2013/02/09/…

Stackoverflow.com/questions/1…

Activemq.apache.org/delay-and-s…

Dynomite Distributed storage Engine DynoJedisClient(1)

Dynomite Distributed storage Engine DynoJedisClient(2)

Original Amazon Dynamo architecture

Netlix Dynomite performance benchmark, based on AWS and Redis

Why do distributed tasks have to be delayed?