Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

Disruptor Principle Analysis

Disruptor is associated with the task-handling event and the disruptor.start() method is called, as you can see from the start() method that the consumer thread is started.

Start the Disruptor

Start () -> Start Disruptor and run the event handler.

public RingBuffer<T> start(a){
        checkOnlyStartedOnce();
        // The handler object added in the previous handleEventsWith() method is added to the consumerRepository, which loops through the consumerRepository to start the consumer thread
        for (final ConsumerInfo consumerInfo : consumerRepository){
            // Get a thread from the thread pool to start the consuming event handler. (Consumers start monitoring, once there is a producer delivery, can be consumed)
            // The thread object opened here is an instance of BatchEventProcessorConsumerInfo. Start (executor). }returnRingBuffer. }Copy the code

Related events

The core method of the handleEventsWith() -> createEventProcessors() call to create an event handler.

@SafeVarargs
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
        return createEventProcessors(new Sequence[0], handlers);
}
Copy the code

Storage events

The EventHandler object binding is stored inside the consumerRepository and executed by the BatchEventProcessor agent.

EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers){...final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        // Create a sequence fence
        finalSequenceBarrier Barrier = ringBuffer. NewBarrier (barrierSequences).for (int i = 0, eventHandlersLength = eventhAndlers.length. I < eventHandlersLength. i++){final EventHandler<? super T> eventHandler = eventHandlers[i];
            final BatchEventProcessor<T> batchEventProcessor = newBatchEventProcessor < > (ringBuffer, barrier, eventHandler). .// Add the consumer to consumerRepositoryConsumerRepository. Add (batchEventProcessor eventHandler, barrier). ProcessorSequences. [I] = batchEventProcessor getSequence (). }... }Copy the code
  • In the handleEventsWith() method, you can see that a BatchEventProcessor object (which inherits the Runnable interface) is built, and the start() method also starts an instance of this object.

  • This object inherits from EventProcessor, a core interface of Disruptor, whose implementation class polls for events provided by RingBuffer and implements a wait policy if no event is available to handle.

  • The implementation class of this interface must be associated with a thread to execute, and usually we do not need to implement it ourselves.

BatchEventProcessor class

BatchEventProcessor: Main event loop that processes the events in Disruptor and owns the Sequence of consumers.

Core private member variables
  • Sequence: Maintains the ID of the current consumer consumption.

  • SequenceBarrier: A serial number barrier that coordinates the consumption IDS of consumers. Its main function is to obtain the available serial numbers of consumers and to provide the execution of waiting policies.

  • EventHandler<? Super T> : The consumer’s consumption logic (the business logic we implemented).

  • DataProvider: obtains the consumer object. RingBuffer implements this interface and primarily provides business objects.

Core method
  • ProcessEvents () : Since the BatchEventProcessor inherits from the Runnable interface, the run() method is executed after it is started earlier, and this method is called inside the run() method.
private void processEvents(a)
    {
        T event = null. Gets the serial number of current consumer maintenance in and +1, which is the next consumption serial numberlong nextSequence = sequence.get() + 1L.while (true) {
            try {
                // Get the maximum task ID that can be executed, if not, wait in waitFor()
                final longAvailableSequence = sequenceBarrier. WaitFor (nextSequence).if(batchStartAware ! =null && availableSequence >= nextSequence) {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1)。
                }
                // Continue to obtain the task at the corresponding location and consume until the above query availableSequence is consumed
                while(nextSequence <= availableSequence) {event = dataprovider.get (nextSequence). EventHandler. OnEvent (Event, nextSequence, nextSequence == availableSequence). NextSequence++. } sequence set (availableSequence). }... }}Copy the code
  • The core code of the consumer event handler, inside the sequenceBarrier. WaitFor (nextSequence) method, compares the current consumer sequence number to the available sequence number:

    • When the available number is greater than the current consumer number nextSequence, To get to the biggest event of the currently available serial number ID (waitFot () method is called internal sequencer. GetHighestPublishedSequence (sequence, availableSequence)), recycling consumption.
    • Available serial number is maintained in the ProcessingSequenceBarrier, ProcessingSequenceBarrier is through the ringBuffer newBarrier () to create.

As you can see from the figure, the SequenceBarrier acts as a coordinator in the EventProcessor and RingBuffer when obtaining the available ordinals.

There are some differences in the handling of multiple consumption events and single consumption events on the dependentSequence. Can see ProcessingSequenceBarrier dependentSequence assignment and the get () method (Util. GetMinimumSequence (sequences).

Producer of start-up process analysis

First, the ringBuffer.next() method is called to obtain the available serial number, and then the empty event object created through Eventfactory under the serial number is obtained. After assigning the value to the empty event object, the publish method is called to publish the event, and the consumer can obtain the event for consumption.

The core code for producers is as follows. Here I have captured the code for multi-producer mode:

    public long next(int n){
        if (n < 1 || n > bufferSize) {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize")。
        }
        longThe current.longNext.do{
            //cursor is a sequence maintained by the producer to obtain the subscript of the currently available post, that is, the current post to the positionThe current = cursor. The get ().// then +n gets the next subscript, that is, the location of the next delivery.Next = current + n.longWrapPoint = next-buffersize.// Purpose: To achieve fast reading. GatingSequenceCache exclusive cache row
            longCachedGatingSequence = gatingSequenceCache. The get ().if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){
                // Get the minimum serial number of the consumer
                longGatingSequence = util. getMinimumSequence(gatingSequences, current).if (wrapPoint > gatingSequence) {
                    // If not, block thread 1ns (park() does not have deadlock problem)
                    LockSupport.parkNanos(1).// TODO, should we spin based on the wait strategy?
                    continue. } gatingSequenceCache. Set (gatingSequence). }// If there are more than one producer, keep it thread-safe (the updated cursor is also the cursor parameter of the waitFor() method in the wait policy, so when the update succeeds, the wait policy will pass, indicating that new tasks will be consumed).
            else if (cursor.compareAndSet(current, next)){
                break. }}while (true);
        returnNext. }Copy the code

The cursor object and the util. getMinimumSequence(gatingSequences, current) method. The cursor object is a producer serial number maintained by the producer that identifies where the current producer has been and where it will be next. It is an instantiation object of the Sequence class.

  • As you can see, Sequence inherits and indirectly inherits the RhsPadding and LhsPadding classes, which each define 7 long member variables.

  • The get() method of the Sequence returns a value of type long. This is the filling of cached lines introduced in the previous article, eliminating pseudo-sharing.

  • In a 64 – bit computer, a single cache line generally accounted for 64 bytes, from took the data in memory, when the CPU will remove the related data of other data to fill up a cache line, if other data update at this moment, the cache line cache the data will be failure, next time again when need to use the data need to extract the data from memory.

  • Disruptor, which populates 7 longs to the left and right of a value and ensures that each fetch is exclusive to the cache row. There will be no other data updates that will invalidate the data. Pseudo-sharing issues are avoided (there are also some designs to eliminate pseudo-sharing under the JDK’s co-distribution package).


RingBuffer: This is a looped container that ends end to end and is used to pass data through multiple threads. Many of the parameters used to create Disruptor in the first figure are used to create ringBuffers, such as producer type (single or multiple), instance chemical, container length, wait policy, and so on.

A simple analysis shows that multiple producers simultaneously send data to ringbuffer, assuming that the two producers have filled up ringbuffer at this time. Since the sequence number is increased by +1 (if the acquisition condition is not met, the current thread will be suspended), thread safety can be guaranteed during production. Only one sequence is required.

When multiple consumers come to consume, the consumption speed is different. For example, consumer 1 consumes 0 and 1, consumer 2 consumes 2 and 4, and consumer 3 consumes 3.

After the consumer consumes 0, after the consumer 2 consumes 2, and after the consumer 3 consumes 3, the producer sends the data to the queue again, but the other position has not been consumed, it will send the data to the 0 position. When the producer wants to send the data again, although the second position of consumer 2 is vacant, and the third position of consumer 3 is vacant, Consumers are still consuming 1, unable to continue delivery. The comparison is made by comparing the smallest sequence number of the sequence maintained by consumers themselves.

The util. getMinimumSequence(gatingSequences, current) method is to obtain the minimum sequence numbers of multiple consumers and determine whether the remaining available sequence numbers in the current ringBuffer are greater than the minimum sequence numbers of consumers. If so, Can’t post and needs to block the current thread (locksupport.parknanos (1)).

What happens when the consumer is consuming faster than the producer is consuming, the producer hasn’t written to the queue yet, or the producer is producing faster than the consumer is consuming? Moreover, it has been mentioned many times above that consumers will wait for consumption events that do not meet the conditions. Next, let’s talk about consumers’ waiting strategies.

Common strategies for individuals:

  • BlockingWaitStrategy uses locking, an inefficient strategy.

  • The SleepingWaitStrategy has minimal impact on producer threads and is suitable for scenarios like asynchronous logging. (without locking, etc.)

  • YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy YieldingWaitStrategy

@Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException{
        longAvailableSequence.intCounter = SPIN_TRIES./ / 100
        while ((availableSequence = dependentSequence.get()) < sequence){
            counter = applyWaitMethod(barrier, counter)。
        }
        returnAvailableSequence. }private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {barrier. CheckAlert ().if (0 == counter)
        {
            Thread.yield()。
        }
        else{- counter. }returnCounter. }Copy the code

Java 8 Contended annotations

  • In Java 8, you can use @Contended annotations at the class level for cache line padding. In this way, it can solve the problem of false sharing conflict in the case of multithreading.

  • Contended can be used for class-level modifications as well as field-level modifications. When applied at the field-level, annotated fields are isolated from other fields and loaded on separate cache lines. At the field Level, @Contended also supports a “contention group” attribute (not supported by class-level), where fields in the same group are contiguously in memory (64 bytes in range), but are isolated from other fields.

The @contended annotation behaves like this:

Apply Contended to a class:

@Contended
    public static class ContendedTest2 {
        private Object plainField1;
        private Object plainField2;
        private Object plainField3;
        private Object plainField4;
    }
Copy the code

Fills both ends of the entire field block: (Here is the output using -xx :+PrintFieldLayout)

TestContended$ContendedTest2: field layout
    Entire class is marked contended
     @140 --- instance fields start ---
     @140 "plainField1" Ljava.lang.Object;
     @144 "plainField2" Ljava.lang.Object;
     @148 "plainField3" Ljava.lang.Object;
     @152 "plainField4" Ljava.lang.Object;
     @288 --- instance fields end ---
     @288 --- instance ends ---
Copy the code

Note that we used a padding of 128 bytes — twice the size of most hardware cache lines (cache lines are typically 64 bytes) — to avoid pseudo-share collisions caused by adjacent sector prefetching.

Apply Contended to fields:

public static class ContendedTest1 {
        @Contended
        private Object contendedField1;
        private Object plainField1;
        private Object plainField2;
        private Object plainField3;
        private Object plainField4;
    }
Copy the code

Causes the field to be separated from the contiguous field block and filled efficiently:

TestContended$ContendedTest1: field layout
     @ 12 --- instance fields start ---
     @ 12 "plainField1" Ljava.lang.Object;
     @ 16 "plainField2" Ljava.lang.Object;
     @ 20 "plainField3" Ljava.lang.Object;
     @ 24 "plainField4" Ljava.lang.Object;
     @156 "contendedField1" Ljava.lang.Object; (contended, group = 0)
     @288 --- instance fields end ---
     @288 --- instance ends ---
Copy the code

Annotate multiple fields so that they are filled separately:

public static class ContendedTest4 {
        @Contended
        private Object contendedField1;
        @Contended
        private Object contendedField2;
        private Object plainField3;
        private Object plainField4;
    }
Copy the code

The annotated 2 fields are populated independently:

TestContended$ContendedTest4: field layout
     @ 12 --- instance fields start ---
     @ 12 "plainField3" Ljava.lang.Object;
     @ 16 "plainField4" Ljava.lang.Object;
     @148 "contendedField1" Ljava.lang.Object; (contended, group = 0)
     @280 "contendedField2" Ljava.lang.Object; (contended, group = 0)
     @416 --- instance fields end ---
     @416 --- instance ends ---
Copy the code

In some cases, you will want to group fields, and fields in the same group will have access conflicts with other fields, but not with fields in the same group. For example, it is common for code to update two fields at the same time.

public static class ContendedTest5 {
        @Contended("updater1")
        private Object contendedField1;

        @Contended("updater1")
        private Object contendedField2;

        @Contended("updater2")
        private Object contendedField3;

        private Object plainField5;
        private Object plainField6;
    }
Copy the code

The memory layout is:

TestContended$ContendedTest5: field layout
     @ 12 --- instance fields start ---
     @ 12 "plainField5" Ljava.lang.Object;
     @ 16 "plainField6" Ljava.lang.Object;
     @148 "contendedField1" Ljava.lang.Object; (contended, group = 12)
     @152 "contendedField2" Ljava.lang.Object; (contended, group = 12)
     @284 "contendedField3" Ljava.lang.Object; (contended, group = 15)
     @416 --- instance fields end ---
     @416 --- instance ends ---
Copy the code

@contended Whether the pseudo cache problem can be resolved at the field level with grouping.

import sun.misc.Contended;
public class VolatileLong {
    @Contended("group0")
    public volatile long value1 = 0L;
    @Contended("group0")
    public volatile long value2 = 0L;
    @Contended("group1")
    public volatile long value3 = 0L;  
    @Contended("group1")
    public volatile long value4 = 0L;  
}
Copy the code

It takes two threads to modify the field

  • Test 1: Thread 0 modifyvalue1 and value2; Thread 1 modifies value3 and value4; They’re all in the same group.

  • Test 2: Thread 0 modiates value1 and value3; Thread 1 modifies value2 and value4; They’re in different groups.

Test 1

public final class FalseSharing implements Runnable {
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private static Volatile Long volatileLong;
    private String groupId;
    public FalseSharing(String groupId) {
        this.groupId = groupId;
    }
    public static void main(final String[] args) throws Exception {
        // Thread.sleep(10000);
        System.out.println("starting....");
        volatileLong = new VolatileLong();
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    }

    private static void runTest(a) throws InterruptedException {
        Thread t0 = new Thread(new FalseSharing("t0"));
        Thread t1 = new Thread(new FalseSharing("t1"));
        t0.start();
        t1.start();
        t0.join();
        t1.join();
    }
    public void run(a) {
        long i = ITERATIONS + 1;
        if (groupId.equals("t0")) {
            while (0 != --i) {
                volatileLong.value1 = i;
                volatileLong.value2 = i;
            }
        } else if (groupId.equals("t1")) {
            while (0! = --i) { volatileLong.value3 = i; volatileLong.value4 = i; }}}}public void run(a) {
        long i = ITERATIONS + 1;
        if (groupId.equals("t0")) {
            while (0 != --i) {
                volatileLong.value1 = i;
                volatileLong.value3 = i;
            }
        } else if (groupId.equals("t1")) {
            while (0! = --i) { volatileLong.value2 = i; volatileLong.value4 = i; }}}Copy the code