What is the Disruptor

Disruptor is a high-performance queue developed by UK forex trading company LMAX to address latency issues with in-memory queues (which, in performance testing, have been found to be on the same order of magnitude as I/O operations). Disruptor’s single-threaded system, capable of supporting 6 million orders per second, gained industry attention after its 2010 QCon presentation. In 2011, Enterprise application software expert Martin Fowler wrote a lengthy introduction. It also won Oracle’s official Duke Award that year. Structurally, Disruptor is a circular queue that supports a producer -> consumer pattern. Parallel consumption can be carried out under the condition of no lock, and consumption order can also be carried out according to the dependence between consumers. This article demonstrates how some classic scenarios can be implemented via Disruptor.

Add the dependent

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>
Copy the code

Single producer single consumer model

Start by creating an OrderEvent class, which will be placed in the ring queue as the message content.

@Data
public class OrderEvent {
    private String id;
}
Copy the code

Create the OrderEventProducer class, which will be used as a producer.

public class OrderEventProducer {
    private final RingBuffer<OrderEvent> ringBuffer;
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(String orderId) {
        long sequence = ringBuffer.next();
        try {
            OrderEvent orderEvent = ringBuffer.get(sequence);
            orderEvent.setId(orderId);
        } finally{ ringBuffer.publish(sequence); }}}Copy the code

Create the OrderEventHandler class and implement the EventHandler

and WorkHandler

interfaces as consumers.

@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        log.info("event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch);
    }
    @Override
    public void onEvent(OrderEvent event) {
        log.info("event: {}", event); }}Copy the code

By creating these three classes, we have the event class, producer, and consumer. Let’s demonstrate this sequence through a main method.

@Slf4j
public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new.1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy()
        );
        disruptor.handleEventsWith(new OrderEventHandler());
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = newOrderEventProducer(ringBuffer); eventProducer.onData(UUID.randomUUID().toString()); }}Copy the code

One producer, many consumers

If you have multiple consumers, you simply pass in multiple consumers when you call the handleEventsWith method. The following code passes two consumers.

- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
Copy the code

Incoming above two consumer consumption will repeat each message, if you want to implement a message in the case of have more than one consumer, will only be a consumer, you need to call handleEventsWithWorkerPool method.

- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
Copy the code

More producers, more consumers

In real development, multiple producers send messages and multiple consumers process messages is the norm. Disruptor supports this, too. The multi-producer, multi-consumer code is as follows:

@Slf4j
public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new.1024 * 1024,
                Executors.defaultThreadFactory(),
                // Change the enumeration here to multiple producers
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        // Create a thread pool to simulate multiple producers
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; i++) { fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString())); }}}Copy the code

Consumer priority

Disruptor can do much more than that. In a real world scenario, we usually form a consumption chain based on business logic. For example, A message must be consumed by consumer A -> consumer B -> consumer C. When configuring consumers, this can be done through the.then method. As follows:

disruptor.handleEventsWith(new OrderEventHandler())
         .then(new OrderEventHandler())
         .then(new OrderEventHandler());
Copy the code

HandleEventsWith and handleEventsWithWorkerPool are, of course, support. Then, they can be used in combination. For example, consumer A -> (consumer B – C) -> Consumer D

disruptor.handleEventsWith(new OrderEventHandler())
         .thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
         .then(new OrderEventHandler());
Copy the code

conclusion

This is the common approach to Disruptor high-performance queues. In fact, the producer -> consumer pattern is quite common and can be easily achieved with some message queues. The difference is that Disruptor is implemented in-memory as a queue and is unlocked. This is why Disruptor is effective.

Refer to the link

  • lmax-exchange.github.io/disruptor/
  • Tech.meituan.com/2016/11/18/…
  • www.cnblogs.com/pku-liuqian…
  • ifeve.com/disruptor/