The theory of

LMAX was formed to create a high-performance financial exchange. As part of our work to accomplish this goal, we have evaluated several approaches to designing such a system, but as we began to measure these approaches, we encountered some fundamental limitations of traditional approaches.

Many applications rely on queues to exchange data between processing phases. Our performance tests show that when queues are used this way, the latency costs are of the same order of magnitude as IO operations on disks (RAID – or SSD-based disk systems) — very slow. If there are multiple queues in an end-to-end operation, this adds hundreds of microseconds to the overall delay. There is clearly room for improvement.

Further study and attention to computer science led us to realize that the merging of concerns inherent in traditional approaches (such as queues and processing nodes) leads to contention in multithreaded implementations, suggesting that there may be a better approach.

Think about how modern cpus work. We call this Mechanical Sympathy. (Martin Thompson was a fan of the phrase Mechanical Sympathy. So they have a good feel for how best to navigate it), using good design practices and a focus on grooming concerns, we’ve come up with a data structure and usage pattern we call Disruptor.

Tests have shown that the average latency for a three-stage pipeline using Disruptor is three orders of magnitude lower than the equivalent queue-based approach. In addition, the throughput of Disruptor processing increased approximately eight times with the same configuration.

These performance improvements represent a step change in thinking around concurrent programming. This new pattern is the ideal foundation for any asynchronous event processing architecture that requires high throughput and low latency.

At LMAX, we built an order matching engine, real-time risk management, and a high availability memory transaction processing system, all of which have had great success with this model. Each of these systems has set new performance standards that, as far as we know, are unmatched.

However, this is not a professional solution only for the financial industry. Disruptor is a universal mechanism that addresses complex issues in concurrent programming in a way that maximizes performance and is simple to implement. Although some of the concepts may seem unusual, in our experience, systems built in this pattern are much simpler to implement than similar mechanisms.

Disruptor has less write contention, lower concurrency overhead, and is more cache-friendly than similar methods, all of which result in greater throughput, less jitter, and lower latency. On processors with moderate clock rates, we see over 25 million messages per second with a latency of less than 50 nanoseconds. This performance is a significant improvement over any other implementation we have seen. This is very close to the theoretical limits of modern processors exchanging data between cores.

1. An overview of the

Disruptor is the result of our efforts to build the highest performing financial exchange in the world at LMAX. Early designs focused on architectures derived from SEDA (Staged Event Driven Architecture) and Actors, using pipelines to achieve throughput. After analyzing the various implementations, it became clear that the queuing of events between phases in the pipeline was a major cost factor. We found that queues also introduce latency and high levels of jitter. We put a lot of effort into developing new queue implementations with better performance. However, it is clear that queues as a basic data structure are limited by the merging of the design concerns of producers, consumers, and their data stores. Disruptor is the result of our parallel architecture that clearly separates these concerns.

2. Complexity of concurrency

In the context of this article, and in computer science in general, concurrency means not only that two or more tasks occur in parallel, but that they compete for access to resources. Competing resources can be databases, files, sockets, or even a location in memory.

Concurrent execution of code involves two things: mutual exclusion and visibility of changes. Mutexes are about managing contending updates to certain resources. Visibility of changes refers to controlling when those changes are visible to other threads. If you can eliminate the need for contending updates, you can avoid mutex. If your algorithm can guarantee that any given resource is modified by only one thread, then mutex is unnecessary. Read and write operations require that all changes be visible to other threads. However, only contention writes require mutex changes.

The most expensive operation in any concurrent environment is contention for write access. Getting multiple threads to write to the same resource requires complex and expensive coordination. This is usually done by using some kind of locking strategy.

2.1 Cost of locking

Locks provide mutual exclusion and ensure that changes occur in an orderly manner. Locks are expensive because they require arbitration during contention. This mediation is achieved by a context switch to the operating system kernel, which suspends the thread waiting for the lock to be released. During such a context switch, and when control is released to the operating system, the operating system may decide to perform other housekeeping tasks while it has control, and the execution context may lose previously cached data and instructions. This can have serious performance implications for modern processors. Fast user mode locks can be used, but only have real benefits if there is no competition.

We will use a simple demonstration to illustrate the cost of locking. The focus of this experiment is to call a function that increments a 64-bit counter 500 million times in a loop. If written in Java, a single thread on the 2.4ghz Intel Westmere EP can execute in 300 milliseconds. For this experiment, language doesn’t matter; the results are similar for all languages with the same basic elements.

Once a lock is introduced to provide mutex, the cost increases significantly, even if the lock is not yet contested. When two or more threads start competing, costs increase by orders of magnitude. The results of this simple experiment are shown in the following table:

Method Time(ms)
Single thread 300
Single thread with lock 10000
Two threads with lock 224000
Single thread with CAS 5700
Two threads with CAS 30000
Single thread with volatile write 4700

2.2 the cost of the CAS

When the target of the update is a single word, memory can be updated using a more efficient alternative than using locks. These choices are based on atomic or interlocking instructions implemented in modern processors. These are often referred to as CAS(compare and swap) operations, such as “Lock CMPXCHG” on x86. A CAS operation is a special machine code instruction that allows words in memory to be conditionally set to atomic operations. For the increment counter experiment, each thread can read the counter in a loop and then try to atomically set it to the new increment value. The old and new values are supplied as parameters to the directive. If the value of the counter matches the supplied expected value when the operation is performed, the counter is updated with the new value. On the other hand, if the value is not as expected, the CAS operation will fail. The counter for that value is then re-read by the thread trying to make the change, and so on until the change is successful. This CAS approach is much more efficient than locking because it does not require switching to the kernel for arbitration. However, CAS operations are not free. The processor must lock its instruction pipeline to ensure atomicity and use memory barriers to make changes visible to other threads. CAS manipulation capabilities can be obtained in Java by using classes in the * java.util.concurrent. Atomic** package.

If a critical part of the program is more complex than a simple counter increment, you may need to orchestrate contention with multiple CAS operations using a complex state machine. Using locks to develop concurrent programs is difficult; Developing lock-free algorithms using CAS operations and memory barriers is many times more complex and difficult to prove correct.

The ideal algorithm is that only one thread has all the writes to a single resource and the other threads read the results. To read the results in a multiprocessor environment, you need to use memory barriers so that threads running on other processors can see the changes.

2.3 Memory Barrier

Modern processors, for performance reasons, scramble the execution order of instructions, shuffling the order in which data is loaded and stored between memory and execution units. The processor simply needs to ensure that the program logic produces the same result regardless of the order in which it is executed. This is not a single-threaded program problem. However, when threads are in a shared state, all memory changes must appear sequentially at the desired points in order for the data exchange to succeed. Memory barriers are used by processors to indicate code segments that are important for memory update order. They are ways to implement hardware sorting and change visibility between threads. Compilers can install free software barriers to ensure the order of compiled code, in addition to the hardware barriers used by the processor itself.

Modern cpus are now much faster than current memory systems. To bridge this partition, the CPU uses complex caching systems that are efficient and fast hardware hash tables with no links. These caches are aligned with other processor caching systems through messaging protocols. In addition, processors have “store buffers” to offload writes to these caches, and “invalidate queues” so that cache conformance protocols can quickly recognize invalid messages when writes are about to occur for efficiency.

For data, this means that the latest version of any value, at any stage after writing, can be in a register, a storage buffer, a tier of multiple caches, or in main memory. If threads are to share this value, they need to make it visible in an ordered manner, which can be achieved through a coordinated exchange of cached consistent messages. The timely generation of these messages can be controlled through memory barriers.

The read memory barrier executes load instructions on the CPU by marking a point in the Invalidate queue to perform changes that enter the CPU cache. This makes it consistent with the order of write operations prior to the read barrier.

The write memory barrier executes storage instructions on the executing CPU by marking a point in the storage buffer to flush writes through its cache. This barrier provides an ordered view of the storage operations that occur before the write barrier.

A full memory barrier command is loaded and stored, but only on the CPU that executes it.

Some cpus have more variants than these three primitives, but these are sufficient to understand the complexity involved. In the Java memory model, read and write of volatile fields implement read and write barriers, respectively. This is explicit in the Java memory model defined in the Java 5 release.

2.4 the cache line

The way caching is used in modern processors is important for successful high-performance operations. Such processors are very efficient at processing the data and instructions in the cache, but relatively inefficient when the cache is lost.

Our hardware doesn’t move memory as bytes or words. For efficiency, caches are organized into cache rows, typically 32-256 bytes in size, with the most common cache behavior being 64 bytes. This is the level of granularity at which the cache consistency protocol operates. This means that if two variables are on the same cache line and they are written by different threads, they will have the same write contention problems as individual variables. This is a concept known as “pseudo-sharing”. Therefore, for high performance, it is important to ensure that independent but concurrent variables do not share the same cache line if you want to minimize contention.

When memory is accessed in a predictable manner, the CPU can hide the latency cost of accessing main memory by anticipating what memory will be accessed next and prefetching it into the cache in the background. This only works if the processor can detect an access pattern, such as a walking memory with predictable “strides.” When iterating through the contents of an array, the steps are predictable, so memory is prefetched in the cache line, maximizing the efficiency of access. In either direction, the stride must generally be smaller than 2048 bytes to be noticed by the processor. However, the nodes of data structures like linked lists and trees are more widely distributed in memory, accessing an unpredictable number of steps. The lack of a consistent pattern in memory limits the ability of the system to prefetch cache lines, resulting in a decrease in main memory access efficiency of more than 2 orders of magnitude.

2.5 Queue problems

Queues typically use linked lists or arrays as the underlying storage of elements. If a queue in memory is allowed to be unbounded, it will grow indefinitely for many types of problems until it runs out of memory and reaches the point of catastrophic failure. This happens when producers can produce faster than consumers. Unbounded queues are useful in systems where producers are guaranteed not to outpace consumers and memory is a valuable resource, but there is always a risk if this assumption is incorrect and queues grow without limit. To avoid this catastrophic outcome, queues are often limited (bounded) in size. Keeping the queue bounded requires that it either be array supported or actively track the size of the queue.

Queue implementations tend to have write contention on header, tail, and size variables. When used, the queue is usually near full or near empty due to the different speeds of consumers and producers. They rarely operate in a balanced middle ground where the rates of production and consumption are equal. This always full or always empty tendency leads to high levels of contention and/or expensive cache consistency. The problem is that even when different concurrent objects (such as locks or CAS variables) are used to separate the head and tail mechanisms, they usually occupy the same cache line.

Managing the head of the producer declaration queue, the tail of the consumer declaration queue, and the storage of the intermediate nodes makes the design of the concurrent implementation too complicated to use a single large-grained lock on the queue. Large-grained locks on entire queues for PUT and take operations are simple to implement, but are a significant throughput bottleneck. If concurrent concerns are separated in the semantics of queues, then any implementation other than a single producer-single consumer implementation becomes very complicated.

There is a further problem with the use of queues in Java because they are an important source of garbage. First, objects must be allocated and placed in queues. Second, if linked lists are supported, objects representing the list nodes must be allocated. All of these objects allocated to support the queue implementation need to be recycled when they are no longer referenced.

2.6 Pipes and drawings

For many types of problems, it makes sense to wire several processing phases into pipes. Such pipes usually have parallel paths and are organized into a graph-like topology. The links between each phase are usually implemented by queues, with each phase having its own thread.

This approach doesn’t come cheap — at every stage, we have to pay the price of joining the queue and leaving the queue unit of work. When paths must fork, the number of targets is multiplied by this cost, resulting in an inevitable race cost when paths must rejoin after such a fork.

It would be ideal if dependency diagrams could be represented without incurring the cost of placing queues between phases.

3. The design of the Disruptor

In an attempt to solve the above problem, a strict separation of designs emerged from the problems we saw merging in queues. This approach eliminates write contention by combining an emphasis on ensuring that any data belongs to only one thread for write access. The design is called Disruptor. It is so named because of its similarity in handling dependency diagrams to the concept of “Phasers” introduced in Java 7 to support fork-join.

LMAX Disruptor aims to address all of the above issues by attempting to maximize memory allocation efficiency and operating in a cache-friendly manner in order to achieve optimal performance on modern hardware.

The core mechanism of Disruptor is a pre-allocated bounded data structure in the form of a circular buffer. Data is added to the circular buffer by one or more producers and processed by one or more consumers.

3.1 Memory Allocation

All memory used for ring-buffers is pre-allocated at startup. The circular buffer can store either an array of Pointers to entries or a structural array representing entries. The limitations of the Java language mean that entries are associated with circular buffers as Pointers to objects. These entries are usually not the data being passed, but its containers. This pre-allocation of entries eliminates problems in languages that support garbage collection, as the entries will be reused and live for the duration of the Disruptor instance. Memory for these entries is allocated simultaneously and is likely to be consecutively arranged in main memory, thus supporting cross-caches. John Rose has proposed introducing “value types” in the Java language, which would allow tuple arrays, as in other languages such as C, to ensure contiguous allocation of memory and avoid indirect Pointers.

Garbage collection can be problematic when developing low-latency systems in a runtime environment like Java. The more memory allocated, the greater the burden on the garbage collector. Garbage collectors work best when objects are very short-lived or effectively permanent. Pre-allocating entries in the circular buffer means that, as far as the garbage collector is concerned, it is immortal and therefore less burdening.

Under heavy loads, queue-based systems can take backups, which can result in slower processing rates and cause allocated objects to live longer than they should, thus being promoted after the younger generation of generational garbage collectors. This has two implications: first, objects must be replicated between generations, which causes delay jitter; Second, these objects must be collected from the older generation, which is usually a more expensive operation and increases the likelihood of a “stop the world” pause when the fragmented memory space needs to be compressed. In large heaps, this can result in longer pauses per GB of memory.

3.2 Dispelling worries

We believe that the following issues are mixed across all queue implementations, and to some extent this collection of different behaviors tends to define the interface of queue implementations:

  • Store the items to be exchanged

  • Coordinate producers to declare the ordinal number of the next exchange entry

  • Coordinate to notify consumers that items are available for consumption

Excessive memory allocation can cause problems when designing financial switching platforms using garbage collection languages. Therefore, as we have described, queues supported by linked lists are not a good approach. Garbage collection is minimized if the entire storage space for exchanging data between the processing phases can be pre-allocated. Furthermore, if this allocation can be performed in a uniform block, the data will be traversed in a way that is very friendly to the caching strategies employed by modern processors. A data structure that satisfies this requirement is an array with all slots pre-filled. Disruptor uses the abstract factory pattern to pre-allocate entries when creating a circular buffer. When an item is declared, the producer can copy its data into a pre-allocated structure.

On most processors, the remaining cost of calculating the serial number, which determines the slot in the ring, is very high. This cost can be greatly reduced by making the ring size a power of two. A bitmask of size -1 can be used to perform remainder operations efficiently.

As we described earlier, bounded queues have contention at both the head and tail of the queue. Circular buffer data structures are not affected by this contention and concurrency primitives because these issues have been teased out into the producer and consumer barriers that must access the circular buffer. The logic of these barriers is described as follows.

In most common uses of Disruptor, there is usually only one producer. Typical producers are file readers or network listeners. In the case of only one producer, there is no contention for sequence/item allocation. In the more unusual use of multiple producers, producers compete to declare the next entry in the circular buffer. Contention in declaring the next available entry can be managed by a simple CAS operation on the slot’s serial number.

Once the producer copies the relevant data into the declared entry, it can expose it to the consumer through the submission sequence. This can be done without CAS by a simple busy rotation until other producers reach this order in their own commits. The producer can then move the cursor forward to indicate the next item available for consumption. Producers can avoid wrapping the ring by tracking the consumer sequence as a simple read before the consumer writes to the ring buffer.

The consumer waits for a sequence to become available in the circular buffer before reading the entry. Various strategies can be employed while waiting. If CPU resources are valuable, they can wait for a condition variable in a lock that is signaled by the producer. This is clearly a point of contention and is only used when CPU resources are more important than latency or throughput. The consumer can also loop through a cursor that represents the currently available sequence in the circular buffer. This can be done by trading CPU resources for latency, or by not using thread yield. This extends well, because by not using locks and condition variables, we break the competitive dependency between producers and consumers. Lockless multi-producer, multi-consumer queues do exist, but they require multiple CAS operations on head, tail, and size counters. Disruptor will not suffer from this CAS contention.

3.3 the sorting

Sorting is the core concept of how you manage concurrency within Disruptor. Each producer and consumer has a strict concept of order for how it interacts with the circular buffer. When a producer declares an entry in the ring, it declares the next slot in sequence. The sequence of the next available slots can be a simple counter in the case of only one producer, or an atomic counter updated with a CAS operation in the case of multiple producers. Once the sequence value is declared, the declared producer can write that entry to the circular buffer. When a producer is finished updating an entry, it can commit the changes by updating a separate counter that represents the cursor on the ring buffer of the latest entry available to the consumer. Producers can use memory barriers to read and write ring buffer cursors on busy rotations without the CAS operations shown below.

Long expectedSequence = claimedSequence -- 1; while (cursor ! = expectedSequence) { // busy spin } cursor = claimedSequence;Copy the code

Consumers wait for a given sequence to be available by reading the cursor using a memory barrier. Once the cursor is updated, the memory barrier ensures that changes to items in the circular buffer are visible to consumers waiting for the cursor to advance.

Each consumer contains its own sequence, which it updates as it processes entries from the circular buffer. These consumer sequences allow producers to track consumers to prevent rings from being packaged. Consumer sequences also allow consumers to coordinate work on the same item in an orderly manner.

In the case of a single producer, no locking or CAS operation is required, regardless of the complexity of the consumer graph. The entire concurrency coordination can be achieved simply by placing a memory barrier on the sequence in question.

3.4 Batch processing Effect

An interesting opportunity arises when the consumer waits in the circular buffer for a forward cursor sequence that is not possible with queues. If the consumer finds that the ring buffer cursor has advanced many steps since the last check, it can process the sequence without involving a concurrency mechanism. This leads to lagging consumers quickly regaining pace with producers when producers suddenly pull ahead, thus balancing the system. This type of batch processing increases throughput while reducing and smoothing latency. From our observations, this effect results in a near-constant delay time, regardless of load, until the memory subsystem is saturated, and then the configuration file obeisance to Little’s law. This is very different from the “J” curve effect we observed on delay as the load increases.

3.5 dependency graph

Queues represent a simple one-step pipeline dependency between producers and consumers. If consumers form a chain of dependencies or a graph-like structure, queues are required between each stage of the graph. This causes the fixed cost of queues several times in the dependency phase diagram. When designing the LMAX Financial exchange, our analysis showed that adopting a queue-based approach would result in queuing costs accounting for the majority of the total execution costs of processing transactions.

Because producer and consumer concerns are separate in the Disruptor pattern, a complex graph of consumer dependencies can be represented with only a single circular buffer in the core. This significantly reduces the fixed cost of execution, resulting in improved throughput while reducing latency.

A single circular buffer can be used to store entries that represent the complex structure of the entire workflow in a cohesive place. Care must be taken in designing such a structure so that state written by individual consumers does not lead to pseudo-sharing of cached rows.

3.6 Disruptor class diagram

The core relationships within the Disruptor framework are described in the class diagram below. This figure omits convenience classes that can be used to simplify the programming model. After establishing the dependency graph, the programming model is simple. Producers declare entries through a ProducerBarrier, write their changes to the declared entry, and then commit the entry back through the ProducerBarrier to make it available for consumption. As a consumer, all we need to do is provide a BatchHandler implementation that receives a callback when a new entry is available. The resulting programming model is event-based and has a lot in common with the Actor model.

A more flexible design can be achieved by separating concerns that are typically combined in queue implementations. RingBuffer exists at the heart of the Disruptor pattern and provides storage for data exchange without contention. The concurrency issues are separate for producers and consumers who interact with RingBuffer. ProducerBarrier manages any concurrency problems associated with declaration slots in the ring buffer, while keeping track of dependent consumers to prevent the ring from being wrapped. The ConsumerBarrier notifies the consumer when a new entry is available, and the consumer can be constructed to represent a dependency graph representing multiple stages in the processing pipeline.

3.7 Code Examples

The following code is an example of implementing a single producer and a single consumer of a consumer using the convenient interface BatchHandler. The consumer runs on a separate thread, receiving items as they become available.

// Callback handler which can be implemented by consumers
final BatchHandler<ValueEntry> batchHandler = new BatchHandler<ValueEntry>()
{
public void onAvailable(final ValueEntry entry) throws Exception
{
// process a new entry as it becomes available.
}

    public void onEndOfBatch() throws Exception
    {
        // useful for flushing results to an IO device if necessary.
    }

    public void onCompletion()
    {
        // do any necessary clean up before shutdown
    }
};

RingBuffer<ValueEntry> ringBuffer =
    new RingBuffer<ValueEntry>(ValueEntry.ENTRY_FACTORY, SIZE,
                               ClaimStrategy.Option.SINGLE_THREADED,
                               WaitStrategy.Option.YIELDING);
ConsumerBarrier<ValueEntry> consumerBarrier = ringBuffer.createConsumerBarrier();
BatchConsumer<ValueEntry> batchConsumer =
    new BatchConsumer<ValueEntry>(consumerBarrier, batchHandler);
ProducerBarrier<ValueEntry> producerBarrier = ringBuffer.createProducerBarrier(batchConsumer);

// Each consumer can run on a separate thread
EXECUTOR.submit(batchConsumer);

// Producers claim entries in sequence
ValueEntry entry = producerBarrier.nextEntry();

// copy data into the entry container

// make the entry available to consumers
producerBarrier.commit(entry);
Copy the code

4. Throughput performance test

We chose Doug Lea good Java. Util. Concurrent. ArrayBlockingQueue as a reference. According to our tests, ArrayBlockingQueue had the highest performance of all bounded queues. Tests are conducted in block programming style to match Disruptor. The test cases detailed below are available in the Disruptor open source project.

Figure 1. Unicast: 1P — 1C

Figure 2. Three Step Pipeline: 1P — 3C

Figure 3. Sequencer: 3P — 1C

Figure 4. Multicast: 1P — 3C

Figure 5. Diamond: 1P — 3C

For the above configuration, apply ArrayBlockingQueue for each data flow arc compared to the Barrier configuration using Disruptor. The following table shows the use of Java 1.6.0_25 64-bit Sun JVM, Windows 7, Intel Core I7 860 @ 2.8 GHz HT free and Intel Core I7-2720QM, Ubuntu 11.04 and the performance results of three best runs when processing 500 million messages. The results can vary widely between JVM executions, and the numbers below are not the highest we have observed.

5. Delay performance testing

To measure the delay, we use a three-stage pipeline and generate events below saturation. This is done by waiting 1 microsecond after injecting an event, then injecting the next event and repeating 50 million times. In order to time at this level of accuracy, the CPU’s timestamp counter must be used. We chose cpus with constant TSC because older processors change frequency due to energy saving and sleep state. Intel Nehalem and later processors use the same TSC and can be accessed by the latest Oracle JVMS running on Ubuntu 11.04. This test did not use CPU binding. For comparison purposes, we use ArrayBlockingQueue again. We could use ConcurrentLinkedQueueviii, it may get better results, but we want to use the limited queue implementation, to ensure that producers will not more than consumers by producing back pressure. Below is the result when running Java 1.6.0_25 64-bit on Ubuntu 11.04 with the 2.2ghz Core I7-2720QM. Disruptor has an average latency of 52 nanoseconds per hop compared to 32,757 nanoseconds for ArrayBlockingQueue. Analysis shows that the use of locks and sending signals through condition variables are the main causes of the delay in ArrayBlockingQueue.

Conclusion 6.

Disruptor is a significant step forward in improving throughput, reducing latency between concurrent execution contexts, and ensuring predictable latency, which is an important consideration in many applications. Our tests show that it performs better than similar methods when exchanging data between threads. We think this is the highest performance mechanism for this kind of data exchange. By focusing on a clear separation of concerns involved in cross-thread data exchange, by eliminating write contention, minimizing read contention, and ensuring that code works well with the caches used by modern processors, we have created an efficient mechanism for exchanging data between threads in any application.

Batch processing allows consumers to process entries at a given threshold without any contention, which introduces a new feature in high-performance systems. For most systems, the latency increases exponentially as load and contention increase, the “J” curve of the feature. As the load on Disruptor increases, the latency remains nearly constant until the memory subsystem becomes saturated.

We believe Disruptor establishes a new benchmark for HIGH-PERFORMANCE computing and is well positioned to continue to leverage current trends in processor and computer design.