Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

Disruptor principle

Disruptor is a non-blocking, multi-production, multi-consumption Disruptor designed to address high-concurrency queues and provide high performance for interthread communication.

Disruptor Disruptor is a Disruptor Disruptor and ringBuffer.

Producers need components

Producer, which generates messages and publishes them to the RingBuffer memory queue.

  • Event model: Units of data passed from producers to consumers, whose types are entirely user-defined.
@Data
public class SampleEvent {
    privateLong id.private String sampleDataStr。
}
Copy the code
  • EventFactory: Factory class that creates events (tasks). (Here the task is created and stored in memory, which can be regarded as an empty task).
public class SampleEventFactory implements EventFactory<SampleEvent> {
    @Override
    public SampleEvent newInstance(a) {
        // Instantiate data (empty data, wait for later initialization)
        return new SampleEvent()。
    }
}
Copy the code
  • RingBuffer: RingBuffer is generally considered the primary implementation of Disruptor and is only responsible for storing and updating data (events) that pass through Disruptor after the current version 3.0.

    • RingBufferSize: Indicates the length of the container. (The core container for Disruptor is ringBuffer, a looped array of finite length.)
  • ProductType: indicates the producer type: single producer or multiple producers.

    • Sequencer: Sequencer is the core API for Disruptor. The interface’s two implementation classes (SingleProducer, MultiProducer) implement all concurrent algorithms for fast and correct data transfer between producers and consumers.
  • WaitStrategy: WaitStrategy. Disruptor (wait policy between consumers and producers when all data in the queue has been consumed), wait policy determines how consumers will wait for producers to place events into Disruptor.

  • RingBuffer: a container for storing data.

@Data
@AllArgsConstructor
public class SampleEventProducer {
    privateRingBuffer < OrderEvent > RingBuffer.public void sendData(long id) {
        // Get the next available sequence number
        longSequence = ringBuffer. Next ().try {
            // Get an empty object (no padding)
            SampleEvent sampleEent = ringBuffer.get(sequence)。
        }finally {
            / / submitRingBuffer. The publish (sequence). }}}Copy the code

Consumers want components

  • Executor: Pool of consumer threads, threads that execute tasks. Each consumer needs threads from the thread pool to consume tasks.

  • EventProcessor: Main event loop used to process events from Disruptor and has ownership of the consumer sequence. There is a representation called BatchEventProcessor that contains a valid implementation of the event loop and calls back to use the provided EventHandler interface implementation.

  • EventHandler: EventHandler that is implemented by the user and represents the user of Disruptor. The user client implements the message processing mechanism, which is implemented by the client.

public class SampleEventHandler implements EventHandler<SampleEvent> {
 
    /** * event-driven listener -- the main body of consumer consumption */
    @Override
    public void onEvent(SampleEvent  event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(event.getSampleDataStr() + ""+ Thread. CurrentThread (). The getName ()). }}Copy the code

Algorithm core Sequence number

  • Sequence: Disruptor uses Sequences as a way to identify where a particular component is located.

    • Each consumer (EventProcessor) maintains a Sequence like Disruptor itself. Most concurrent code relies on changes or movements of these Sequence values, so Sequence supports many of The current features of AtomicLong.

    • In fact, the only real difference is that Sequence contains additional functionality to prevent false sharing between sequences and other values.

  • Sequence Barrier: Sequence Barrier is generated by Sequencer and contains references to the Sequence mainly published in Sequencer and any Sequence of dependent consumers. It contains the logic to determine if there are any events available for the consumer to process.

Benefits of Disruptor:

  1. There is no lock if there is no contention between multiple threads.

  2. An implementation in which all visitors record their serial numbers allows multiple producers to share the same data structure with multiple consumers.

  3. The ability to track serial numbers (Ring Buffer, claim Strategy, producer and consumer) in each object, coupled with the magic of cache row padding, means no pseudo-sharing and unexpected contention.

Let’s take a brief look at the core RingBuffer implementation to see the details of the queue implementation.

It’s a circular queue, kind of like a closed loop in a consistent Hash algorithm, but completely different.

The bottom layer is a fixed-size array structure. Compared with a queue, it has only one subscript pointer cursor. If the number of slots is 2 to the N, it is more convenient for binary-based computers to calculate. If you’ve seen the HashMap source code, you know that HashMap uses a clever way of locating element slots, hash&(Length-1).

RingBuffer is also calculated in the same way, sequence&(Length-1), of course, you can take the module operation.

  • So relative to the calculation speed, the efficiency of bit operation is definitely higher than that of module operation, especially for the calculation of high concurrency, which can save a lot of unit CPU overhead.

Linear storage is generally implemented in two ways:

  • One is a HashTable based on continuous memory allocation
  • One is an iterative pointer based on random memory allocation.

Why RingBuffer uses arrays instead of linked lists?

Locality principle of cache or program

  • Memory is a (Good) array is continuously allocate memory pre-reading strategies, that is memory load time, will be part of the continuous preloaded into the cache memory address, which is to think you might use, we analyzed the above CPU operation data in the operating system process, it can be seen that this design is to don’t have to repeatedly loaded from memory.

  • (Bad) list of memory allocation is fragmented so its memory address is not continuous, leading to each time the CPU will recalculate the next chain table location address, and the relevant data to be loaded from memory, under the condition of small amount of data and can’t see the merits of the performance, but when the case of large amount of data, the minimal consumption, It will affect the overall operating efficiency.

Because RingBuffer does not involve modification or maintenance of storage addresses, the choice of arrays has a beneficial and positive impact on performance.

False sharing

Memory is stored in the cache system in the form of cached rows. Cache lines are 2 to the NTH consecutive bytes, typically 32-256 in size, with the most common cache line size being 64 bytes.

Pseudo-sharing is a term that applies to threads that inadvertently affect each other’s performance when they modify independent variables that share the same cache row. Write contention on cache lines is the biggest limiting factor in achieving scalability for parallel execution threads in SMP systems. (From Baidu definition!)

  • First of all, we know that for locks it’s the interrupt implementation that locks the BUS message bus implementation, and for shared memory, the computer uses the cache line, multiple threads that share variables, sharing the same cache line.

  • To achieve linear scalability with the number of threads, we must ensure that no two threads write to the same variable or cache line. With volatile, we read directly shared variables and read their values from main or shared memory, essentially invalidating the computer cache line.

A thread running on CPU core A wants to update variable X, while A thread on CPU core B wants to update variable Y.

The two heat variables are in the same cache line. Each thread will compete for ownership of the cached row so they can update it. If core A acquires ownership, the MESI/MOSI cache subsystem will need to invalidate the corresponding cache row for core B. The reverse is also true, greatly affecting performance. The cache row problem is further exacerbated if the competing core is on a different socket and must also be interconnected across sockets.

To sum up: If multiple threads operating different member variables, but these variables are stored in the same cache line, if there is a processor updates the cache and refresh the data into main memory, according to the cache consistency principle, other processors will fail in the cache line (I) lead to cache misses, you need to go to the latest data read from the memory, this is false sharing problem.

  • In particular, when different threads operate the same cache line, RFO (Request for Owner) signals need to be sent to lock the cache line to ensure atomicity of write operations. In this case, other threads cannot operate the cache line, which greatly affects efficiency.

To avoid the pseudo-sharing problem of variables that are frequently written on the same cache row, a common solution is cache row padding, or cache row alignment.

The concept of cache row population

When multiple threads to Shared cache line at the same time to write, because the cache system cache consistency principle, can cause false sharing problem, the common solution is to align the Shared variables according to the cache line size for supplement, and make it loaded into the cache to exclusive cache line, avoid with other Shared variables stored in the same cache line.

The following is the cache row implementation, and the premise of the cache row population is that objects allocated at the same time tend to be in the same location.

public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding
Copy the code

If you have different consumers writing to different fields, you need to make sure that there is no pseudo-sharing between the fields.

The value of the long type stored in the /** * array holds the keyword VolatileLongPadding. When the value of the long type is added, six longs are required to fill the cache row. * Note: Cache line alignment is implemented by inheritance because the Java compiler optimizes invalid fields. * /
class CacheLinePadding {
    // If you don't need padding, just comment out the code
    public volatile long p1, p2, p3, p4, p5, p6;
}
class CacheLinePaddingObject extends CacheLinePadding {
    // The actual value
    public volatile long value = 0L;
}
Copy the code

RingBuffer also uses cache row padding to ensure that there is no pseudo shared data in the array. In RingBuffer, except for a LONG type cursor index pointer, P1 -> P7 are all cache row padding. Generally speaking, 8 long type fields are exactly 64 bytes. Fills a cache row, if you need to, because you still have your own data information field.