1. What is a queue

Hear queue believe everyone is no stranger to it, in our real life queue everywhere, go to the supermarket checkout, you will see all rows of stand well, wait for bill, why stand rows of, not you imagine everyone quality, rushing up the invoicing, not only let this supermarket collapse, will also cause all sorts of stampede, Of course, these things actually happen in our reality.

Of course, in the computer world, the queue is a data structure, the queue uses FIFO(first in firstout), new elements (waiting to enter the queue) are always inserted into the tail, and when reading is always read from the head. Queues are commonly used in computation for queuing (such as queuing for thread pools, queuing for locks), decoupling (producer-consumer mode), asynchracy, and so on.

2. Queues in JDK

Queues in the JDK implement the java.util.Queue interface, which is divided into two classes of Queue, one is thread unsafe, ArrayDeque, LinkedList, etc., and the other is thread safe under the java.util.concurrent package. Our machines all belong to multithreading, when the multithreading of the same queue queuing operation, if the use of thread insecurity will occur, overwrite data, data loss and other unpredictable things, so we can only choose the thread safe queue at this time. The following are some of the threadsafe queues provided in the JDK:

We can see that our lock-free queue is unbounded, have a lock on the queue is bounded, will involve a problem here, our online in real environment, unbounded queue, has a great impact on our system, we could also lead to memory overflow directly, so we must first rule out the unbounded queue, of course not unbounded queue is useless, It just has to be ruled out in certain situations. Then there are ArrayBlockingQueue and LinkedBlockingQueue, both of which are thread safe with ReentrantLock. The difference between them is arrays and lists. In queues, It is possible to fetch multiple queue elements at once. Arrays have contiguous addresses in memory, and the operating system optimizes caching (rows are also described below), so access is faster. We’ll also try to select ArrayBlockingQueue. It turns out that many third-party frameworks, such as the early Log4j async, are the ArrayBlockingQueue of choice.

ArrayBlockingQueue has its own disadvantages, such as low performance. Why does the JDK add a number of queues that are not locked? But someone did go up.

3.Disruptor

Disruptor, a high-performance queue developed by LMAX, is an open source concurrency framework that won the 2011Duke’s application framework innovation award. Disruptor enables concurrent Queue operations on the network without locking and supports 6 million orders per second on a single thread. Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor

3.1 Why is it so awesome?

Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor

  • CAS

  • Eliminating pseudo sharing

  • RingBuffer uses these three killers to make Disruptor so awesome.

3.1.1 locks and CAS

One of the reasons why our ArrayBlockingQueue is discarded is because we use a heavyweight lock lock, and when we lock it, we suspend the lock, and when we unlock it, we restore the thread, which has some overhead, and once we don’t get the lock, the thread has to wait, This thread can’t do anything.

CAS (compare and swap), as its name implies, compares the exchange first, generally comparing whether the old value is set. If yes, people familiar with optimistic locking know that CAS can be used to achieve optimistic locking, there is no thread context switch in CAS, reducing unnecessary overhead. Here use JMH, with two threads, one call at a time, in my own machine to test, the code is as follows:

@BenchmarkMode({Mode.SampleTime}) @OutputTimeUnit(TimeUnit.MILLISECONDS) @Warmup(iterations=3, time = 5, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations=1,batchSize = 100000000) @Threads(2) @Fork(1) @State(Scope.Benchmark) public class Myclass { Lock lock = new ReentrantLock(); long i = 0; AtomicLong atomicLong = new AtomicLong(0); @Benchmark public void measureLock() { lock.lock(); i++; lock.unlock(); } @Benchmark public void measureCAS() { atomicLong.incrementAndGet(); } @Benchmark public void measureNoLock() { i++; }}Copy the code

The test results are as follows:

As you can see, Lock is a five-digit number, CAS is a four-digit number, and unlocked is a three-digit number. Lock>CAS> no Lock.

Our Disruptor uses CAS for subscript Settings on queues, reducing lock collisions and improving performance.

CAS is also used for other lockless queues in the JDK, as well as for atomic classes. a

3.1.2 pseudo Shared

When it comes to pseudo-sharing, we have to say that computer CPU cache, cache size is one of the important indicators of CPU, and the structure and size of cache has a very large impact on CPU speed, CPU cache running frequency is very high, generally with the processor in the same frequency operation, work efficiency is far greater than the system memory and hard disk. In actual operation, the CPU often needs to read the same data block repeatedly. The increase of cache capacity can greatly improve the hit ratio of data read by the CPU without searching for data on the memory or hard disk, thus improving system performance. But caches are small in terms of CPU chip area and cost.

As you can see, Lock is a five-digit number, CAS is a four-digit number, and unlocked is a three-digit number. Lock>CAS> no Lock.

Our Disruptor uses CAS for subscript Settings on queues, reducing lock collisions and improving performance.

CAS is also used for other lockless queues in the JDK, as well as for atomic classes. a

3.1.2 pseudo Shared

When it comes to pseudo-sharing, we have to say that computer CPU cache, cache size is one of the important indicators of CPU, and the structure and size of cache has a very large impact on CPU speed, CPU cache running frequency is very high, generally with the processor in the same frequency operation, work efficiency is far greater than the system memory and hard disk. In actual operation, the CPU often needs to read the same data block repeatedly. The increase of cache capacity can greatly improve the hit ratio of data read by the CPU without searching for data on the memory or hard disk, thus improving system performance. But caches are small in terms of CPU chip area and cost.

Martin and Mike’s QConpresentation gave a number of times per cache:

Cache line

In the CPU’s multilevel cache, it’s not stored as a separate item, it’s a pageCahe kind of strategy, it’s stored as a cache line, and the cache line is usually 64 bytes in size. In Java, Long is 8 bytes, so you can store 8 longs. For example, when you access a Long variable, He’s going to load 7 more, and that’s the reason why you choose an array and not a linked list, because in an array you can get fast access by buffering rows.

Is the cache line omnipotent? NO, because it still introduces a disadvantage, and I’m going to give you an example of this disadvantage. Imagine an ArrayQueue, an ArrayQueue, and its data structure looks like this:

class ArrayQueue{
    long maxSize;
    long currentIndex;
}
Copy the code

For maxSize is the size of the array that we defined at the beginning, and for currentIndex is the location of our current queue, and that’s going to change pretty quickly, so you can imagine when you access maxSize, you’re loading currentIndex in, and at that point, If another thread updates currentIndex, it’s going to invalidate the row in the CACHE in the CPU, please note that this is CPU mandated, it’s not just saying currentIndex is invalidated, if it continues to access maxSize it’s still going to have to read from memory, but maxSize is what we defined in the beginning, We should have been able to access the cache, but were affected by currentIndex, which we frequently change.

The Padding of magic

To address this issue, use the Padding in your Disruptor

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}
Copy the code

The Value is filled with some other useless long variable. So when you modify Value, you don’t affect the cache line of other variables.

As a final note, @Contended annotations are available in JDK8, and generally only allowed internally in the Jdk. If you use them yourself, you have to configure the Jvm parameter -RestricContentended = fase, which disables this annotation. ConcurrentHashMap is used in ConcurrentHashMap. In ConcurrentHashMap, each bucket is calculated separately with a counter that changes from moment to moment. This annotation is used to fill the cache row optimization to increase performance.

3.1.3 RingBuffer

Disruptor uses arrays to store our data. We’ve also described arrays to make good use of caches when accessing Disruptor. However, Disruptor further uses RingBuffer arrays to store data. The RingBuffer is accessed by residuals. For example, if the size of the array is 10,0 accesses the position with index 0. In fact, 10,20 and so on also access the position with index 0.

In fact, in these frameworks, you don’t do the %, you do the ampersand, and that requires you to set the size to be 2 to the N which is 10,100,1000, etc., so if you subtract 1 you get 1, 11, 111, Index & (size-1) can be used to increase access speed. If you do not set your Disruptor size to the power of 2, it will raise a BufferSize exception that must be the power of 2.

Of course, it not only solves the problem of fast array access, but also eliminates the need to reallocate memory, reducing garbage collection because 0,10,20, etc., all execute on the same memory area, so that memory does not need to be reallocated and is frequently collected by the JVM garbage collector.

Here comes the big Three, which lay the foundation for such high performance Disruptor. You will also learn how to use Disruptor and how it works.

3.2 How do YOU use Disruptor

Here’s a simple example:

Ublic static void main(String[] args) throws Exception {// Queue elements class Element {@contended private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; ThreadFactory ThreadFactory = new ThreadFactory() {int I = 0; @Override public Thread newThread(Runnable r) { return new Thread(r, "simpleThread" + String.valueOf(i++)); }}; EventFactory<Element> factory = new EventFactory<Element>() {@override public Element newInstance() { return new Element(); }}; EventHandler<Element> handler = new EventHandler<Element>() {@override public void onEvent(Element) element, long sequence, boolean endOfBatch) throws InterruptedException { System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence); // Thread.sleep(10000000); }}; // BlockingWaitStrategy = new BlockingWaitStrategy(); Int bufferSize = 8; // Create disruptor, Disruptor<Element> Disruptor = new Disruptor(Factory, bufferSize, threadFactory, producerType.single, strategy); / / set the EventHandler disruptor. HandleEventsWith (handler); // Start the thread of disruptor disruptor.start(); for (int i = 0; i < 10; i++) { disruptor.publishEvent((element, Sequence) -> {system.out.println (" previous data "+ element.getValue() +" current sequence" + sequence); Element. setValue(" I am the first "+ sequence + "); }); }}Copy the code

There are a few key ones in Disruptor: ThreadFactory: this is a ThreadFactory for the threads required by our Disruptor producers to consume. EventFactory: The factory used to generate our queue elements. In Disruptor, it fills the RingBuffer directly upon initialization, once in place. EventHandler: Handler used to process events, where an EventHandler can be viewed as a consumer, but multiple Eventhandlers are separate consumption queues. WorkHandler: This is also the handler used to process events. The difference is that multiple consumers share the same queue. WaitStrategy: WaitStrategy. There are multiple strategies at Disruptor to determine what consumers will do if there is no data available for purchase. Here is a brief list of some of the strategies used in Disruptor

  • BlockingWaitStrategy: Waits for the producer to wake up by thread blocking, and then loops to check whether the dependent sequence has been consumed.

  • BusySpinWaitStrategy: The thread spins while waiting, which can be CPU intensive

  • LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.

  • LiteTimeoutBlockingWaitStrategy: compared with LiteBlockingWaitStrategy, set the blocking time, over time after throw exceptions.

  • YieldingWaitStrategy: Try 100 times, then Thread.yield() yields the CPU

EventTranslator: Implementing this interface translates our other data structures into events that stream within Disruptor.

3.3 Working Principles

Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor Disruptor

3.3.1 producers

Producertype. Single and producertype. MULTI can be divided into multiple producers and Single producers, which can be distinguished by producertype. Single and producertype. MULTI.

Disruptor.publishevent and Disruptor.publishevents () are used for single and group publishing within disruptor.

Posting an event to the disruptor queue requires the following steps:

  1. First, obtain the next position in RingBuffer that can be published on RingBuffer. This can be divided into two categories:
  • Never written position

  • It’s been read by all consumers, it’s been consumed. If there is no W location for us to publish, then we need to do an endless loop. Disruptor does a clever thing by blocking and suspending the thread through locksuport.park (), rather than occupying the CPU for a long time, so that the CPU does not continue this empty loop and other threads miss the CPU time chunk.

The cas preemption is performed after the location is obtained, which is not required if the location is single-threaded.

  1. Next, call the EventTranslator we described above to give the Event at that location in the RingBuffer in step 1 to the EventTranslator to override.

  2. Disruptor contains an additional array that records the current ringBuffer location and its current serial number. For example, 0, 10, and 20 are used to record the current ringBuffer number when you write to 10. And we’ll talk about that later. To publish, update the avliableBuffer and wake up all blocking producers.

BufferSize =8. The following describes the process of pushing 3 more messages after we have pushed 8 events: 1. First we call next(3), we’re in position 7 so the next three are 8,9,10, mod 0,1,2. 2. Rewrite the data in memory areas 0, 1, and 2. 3. Write avaliableBuffer.

By the way, I don’t know if you are familiar with the above process. Yes, it is similar to our 2PC, two-stage submission. RingBuffer location is locked first, and then submission and notification are made to consumers.

3.3.1 consumers

For consumers, the above introduction is divided into two types, one is independent consumption of multiple consumers, the other is the consumption of multiple consumers in the same queue, here introduce the more complex consumption of multiple consumers in the same queue, can understand this also can understand independent consumption. Our disruptor.strat() method starts our consumer thread for background consumption. We need to pay attention to two queues among consumers, one is the progress queue shared by all consumers, and the other is the progress queue of independent consumption by each consumer. 1. Preempt the consumer shared queue for the Next Next CAS, and mark the current progress for the queue of its own consumption progress. 2. Apply for the Next position of readable RingBuffer for yourself. The application is not only applied to Next, but may be applied to a larger range than Next.

  • Gets the position of the producer’s last write to RingBuffer

  • Determine if it is smaller than the position I want to apply for reading

  • If greater than, the position is already written and returned to the producer.

  • If less than proof has not been written to this position, it blocks in the blocking policy and wakes up during the producer commit phase. 3. Read the check of the position, because you apply for the position may be straight, such as producers in the 7, then apply for reading, if consumers have 8 and 10 writing in the serial number of the position, but 9 this position could write, because the first step will return 10, 9 but actually can’t read, So I have to shrink down to 8.

4. If the shrink is smaller than the current next, continue the loop application. 5. Hand it to handler.onEvent()

Again, for example, we want to apply the position next=8. 1. First preempt progress 8 on the shared queue and write progress 7 on the independent queue 2. Obtain the maximum readable position of 8. 3. Submit the information to Handler.

4. The Disruptor Log4j

The following figure shows the Log4j using Disruptor, ArrayBlockingQueue and synchronous Log4j throughput contrast, can be seen using the Disruptor maxed out other, of course, more framework USES the Disruptor, introduced here is not to do.

The last

Disruptor this article introduces the downsides of traditional blocking queues, highlights why they are so great, and how they work.

If you were asked to design an efficient lock free queue, what would you do? I believe you can sum up the answer from the passage.

More articles: Java Learning Garden