Disruptor is an open source framework developed to address the issue of queue locking in high concurrency. Originally developed and used by LMAX, Disruptor enables concurrent queue operations without locking and claims to be able to process 6 million orders per second in a single thread

Liverpoolfc.tv: lmax – exchange. Making. IO/disruptor

Disruptor is currently used by a number of high-profile projects including Apache Storm, Camel, and Log4j2 for high performance

Why is the Disruptor Framework created

“The current way Java built-in queues are thread-safe:”

ArrayBlockingQueue: an array-based queue that locks data in multithreaded situations.

LinkedBlockingQueue: a queue based on a linked list, which is also locked to ensure data security in multi-threaded situations.

ConcurrentLinkedQueue: Queue based on a linked list, via CAS

We know that locking often severely affects performance during programming, so try to go lock-free, which is where Disruptor, a high-concurrency framework, comes in

The basic concept

Reference address: github.com/LMAX-Exchan…

RingBuffer — Disruptor underlying data structure implementation, core class, a conduit for exchanging data between threads;

Sequencer — serial number manager, the implementer of production synchronization, is responsible for the management and coordination of consumer/producer serial number and serial number fence. Sequencer has two different modes of single producer and multiple producer, which realizes various synchronization algorithms.

Sequence — Declare a Sequence number that tracks changes in tasks and consumer consumption in ringBuffer. Most of the concurrent code in Disruptor is synchronized with changes to the value of the Sequence, not locking, which is a major reason for the high performance of disruptor.

SequenceBarrier — Ordinal barriers that manage and coordinate producer cursor ordinals and individual consumer ordinals, ensuring that producers do not overwrite messages that consumers have not had time to process, and ensuring that dependent consumers are processed in the correct order

EventProcessor — an EventProcessor that listens for events in RingBuffer and consumes available events. Events read from RingBuffer are handed over to the actual producer implementation class for consumption; It listens for the next available sequence number until the event corresponding to that sequence number is ready.

EventHandler — The business processor, which is the interface of the actual consumer that performs the concrete implementation of the business logic and is implemented by a third party; Represents the consumer.

Producer — The Producer interface, in which a third-party thread acts as Producer, writes events to RingBuffer.

Wait Strategy: The Wait Strategy determines how a consumer waits for a producer to place an Event into a Disruptor.

 

Waiting for the strategy

Source code address: github.com/LMAX-Exchan…

“BlockingWaitStrategy”

The default policy for Disruptor is BlockingWaitStrategy. Inside BlockingWaitStrategy, locks and conditions are used to control thread wake up. BlockingWaitStrategy is the least efficient strategy, but it uses the least AMOUNT of CPU and provides more consistent performance across different deployment environments.

“SleepingWaitStrategy”

A SleepingWaitStrategy performs similarly to BlockingWaitStrategy and has a similar AMOUNT of CPU consumption, but has minimal impact on producer threads by using locksupport.parknanos (1) to implement circular waiting.

“YieldingWaitStrategy”

YieldingWaitStrategy is one of the strategies that can be used in a low-latency system. The YieldingWaitStrategy increases the spin to wait sequence to the appropriate value. In the body of the loop, thread.yield () is called to allow other queued threads to run. This policy is recommended for scenarios that require high performance and the number of event processing lines is smaller than the number of CPU logical cores. For example, the CPU enables hyperthreading.

“BusySpinWaitStrategy”

Best performance, suitable for low latency systems. This policy is recommended for scenarios that require high performance and the number of event processing threads is smaller than the number of CPU logical cores. For example, the CPU enables hyperthreading.

“PhasedBackoffWaitStrategy”

Scenarios where spin + yield + custom policies, CPU resources are scarce, and throughput and latency are not important.

Using an example

Reference address: github.com/LMAX-Exchan…

<dependency>
      <groupId>com.lmax</groupId>
      <artifactId>disruptor</artifactId>
      <version>3.3.4</version>
   </dependency>
Copy the code
// Define the type of data exchanged by the event Disruptor. public class LongEvent { private Long value; public Long getValue() { return value; } public void setValue(Long value) { this.value = value; }}Copy the code
public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }}
Copy the code
Public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, Throws Exception {system.out.println (" consumer :"+event.getValue()); long sequence, Boolean endOfBatch) throws Exception {system.out.println (" consumer :"+event.getValue()); }}Copy the code
Public class LongEventProducer {public final RingBuffer<LongEvent> RingBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer ByteBuffer) {// 1.ringBuffer long sequence = ringbuffer.next (); Long data = null; LongEvent = ringBuffer.get(sequence); LongEvent = ringBuffer.get(sequence); data = byteBuffer.getLong(0); LongEvent. SetValue (data); try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }} finally {system.out.println (" produce this ready to send data "); //4. Publish the event ringbuffer.publish (sequence); }}}Copy the code
public class DisruptorMain { public static void main(String[] args) { // 1. Create a thread to the thread cache to start event handling of Consumer ExecutorService executor = Executors. NewCachedThreadPool (); EventFactory<LongEvent> EventFactory = new LongEventFactory(); Int ringBufferSize = 1024 * 1024; // ringBufferSize must be 2 to the N // 4. Disruptor<LongEvent> Disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, Executor, ProducerType.SINGLE, new YieldingWaitStrategy()); / / 5. Connect consumption method disruptor. HandleEventsWith (new LongEventHandler ()); // 6. Start disruptor.start(); RingBuffer<LongEvent> RingBuffer = disruptor.getringBuffer (); // 8. Create producer LongEventProducer = new LongEventProducer(ringBuffer); ByteBuffer = ByteBuffer. Allocate (8); for (int i = 1; i <= 100; i++) { byteBuffer.putLong(0, i); producer.onData(byteBuffer); } //10. Disable disruptor and executor disruptor.shutdown(); executor.shutdown(); }}Copy the code

Core Design Principles

Disruptor addresses slow queue speeds by:

“Ring array structure:”

To avoid garbage collection, use arrays instead of linked lists. Also, arrays are more processor-friendly to the cache mechanism

Reason: THE CPU cache is made up of many cache rows. Each cache line is usually 64 bytes, and it effectively refers to an address in main memory. A Java variable of type long is 8 bytes, so you can store up to 8 long variables in a cache line. Each time the CPU pulls data from main memory, adjacent data is stored in the same cache row. When you access a long array, if one of the values in the array is loaded into the cache, it automatically loads the other seven. So you can go through the array very quickly.

“Element location:”

Array length 2^n, through bit operation, speed up the positioning. Subscripts take the form of increments. Don’t worry about index overflow. Index is of type long, and even 1 million QPS would take 300,000 years to run out.

“Lock free design:”

Each producer or consumer thread requests the location of an element in the array that it can operate on, and then writes or reads data directly from that location. The entire process uses the atomic variable CAS to ensure thread-safe operations

The data structure

The framework uses RingBuffer, a customizable array of rings, as the queue’s data structure.

In addition to the number group, there is a sequence that points to the next available element for use by producers and consumers.

The schematic diagram is as follows:

 

Sequence

Disruptor manages the data (events) exchanged through it through an ascending ordinal, and processing of the data (events) is always ascending along the ordinal.

“What are the advantages of array + sequence number design?”

As a review of HashMap, the time complexity of storing and retrieving elements in the array is only O(1) when the index subscript is known, and the index can be calculated by modulo the sequence number and the length of the array: index=sequence % table.length. Table. Length must be a power of 2.

Data writing process

The process of writing data in a single thread:

  1. Apply to write m elements;
  2. If m elements can be entered, the maximum sequence number is returned. Here the main determinant is whether unread elements will be overwritten;
  3. If the return is correct, the producer starts writing the element.

 

Usage scenarios

Disruptor has been tested to provide significantly better latency and throughput than ArrayBlockingQueue, so if you’re experiencing performance bottlenecks using ArrayBlockingQueue, consider using Disruptor instead.

Reference: github.com/LMAX-Exchan…

 

 

Of course, Disruptor performance is not guaranteed, so use is subject to testing.

The most common scenario for Disruptor is the producer-consumer scenario, which is the “one producer, many consumers” scenario and requires sequential processing.

For example, we read data sequentially from MySQL’s BigLog file and write it to ElasticSearch. In this scenario, BigLog requires one producer per file, and that one is a producer. Writing to ElasticSearch is strictly sequential, otherwise it will cause problems, so the usual multi-consumer threads can’t solve this problem, and if you lock it, performance will suffer