preface
The article was first published on an official account (Yuebanfeiyu), and then synchronized to the personal website: xiaoflyfish.cn/
Wechat search: month with flying fish, make a friend, into the interview exchange group
- The public account responds to 666 backstage, you can get free electronic books
Feel good, hope to like, look, forward to support, thank you
Disruptor is an open source framework developed to address the issue of queue locking in high concurrency. Originally developed and used by LMAX, Disruptor enables concurrent queue operations without locking and claims to be able to process 6 million orders per second in a single thread
Liverpoolfc.tv: lmax – exchange. Making. IO/disruptor
Disruptor is currently used by a number of high-profile projects including Apache Storm, Camel, and Log4j2 for high performance
Why is the Disruptor Framework created
“The current way Java built-in queues are thread-safe:”
ArrayBlockingQueue: an array-based queue that locks data in multithreaded situations.
LinkedBlockingQueue: a queue based on a linked list, which is also locked to ensure data security in multi-threaded situations.
ConcurrentLinkedQueue: Queue based on a linked list, via CAS
We know that locking often severely affects performance during programming, so try to go lock-free, which is where Disruptor, a high-concurrency framework, comes in
The basic concept
Reference address: github.com/LMAX-Exchan…
RingBuffer — Disruptor underlying data structure implementation, core class, a conduit for exchanging data between threads;
Sequencer — serial number manager, the implementer of production synchronization, is responsible for the management and coordination of consumer/producer serial number and serial number fence. Sequencer has two different modes of single producer and multiple producer, which realizes various synchronization algorithms.
Sequence — Declare a Sequence number that tracks changes in tasks and consumer consumption in ringBuffer. Most of the concurrent code in Disruptor is synchronized with changes to the value of the Sequence, not locking, which is a major reason for the high performance of disruptor.
SequenceBarrier — Ordinal barriers that manage and coordinate producer cursor ordinals and individual consumer ordinals, ensuring that producers do not overwrite messages that consumers have not had time to process, and ensuring that dependent consumers are processed in the correct order
EventProcessor — an EventProcessor that listens for events in RingBuffer and consumes available events. Events read from RingBuffer are handed over to the actual producer implementation class for consumption; It listens for the next available sequence number until the event corresponding to that sequence number is ready.
EventHandler — The business processor, which is the interface of the actual consumer that performs the concrete implementation of the business logic and is implemented by a third party; Represents the consumer.
Producer — The Producer interface, in which a third-party thread acts as Producer, writes events to RingBuffer.
Wait Strategy: The Wait Strategy determines how a consumer waits for a producer to place an Event into a Disruptor.
Waiting for the strategy
Source code address: github.com/LMAX-Exchan…
“BlockingWaitStrategy”
The default policy for Disruptor is BlockingWaitStrategy. Inside BlockingWaitStrategy, locks and conditions are used to control thread wake up. BlockingWaitStrategy is the least efficient strategy, but it uses the least AMOUNT of CPU and provides more consistent performance across different deployment environments.
“SleepingWaitStrategy”
A SleepingWaitStrategy performs similarly to BlockingWaitStrategy and has a similar AMOUNT of CPU consumption, but has minimal impact on producer threads by using locksupport.parknanos (1) to implement circular waiting.
“YieldingWaitStrategy”
YieldingWaitStrategy is one of the strategies that can be used in a low-latency system. The YieldingWaitStrategy increases the spin to wait sequence to the appropriate value. In the body of the loop, thread.yield () is called to allow other queued threads to run. This policy is recommended for scenarios that require high performance and the number of event processing lines is smaller than the number of CPU logical cores. For example, the CPU enables hyperthreading.
“BusySpinWaitStrategy”
Best performance, suitable for low latency systems. This policy is recommended for scenarios that require high performance and the number of event processing threads is smaller than the number of CPU logical cores. For example, the CPU enables hyperthreading.
“PhasedBackoffWaitStrategy”
Scenarios where spin + yield + custom policies, CPU resources are scarce, and throughput and latency are not important.
Using an example
Reference address: github.com/LMAX-Exchan…
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>We do</version>
</dependency>
Copy the code
// Define the type of data exchanged by the event Disruptor.
public class LongEvent {
private Long value;
public Long getValue(a) {
return value;
}
public void setValue(Long value) {
this.value = value; }}Copy the code
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance(a) {
return newLongEvent(); }}Copy the code
// Define event consumers
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Consumer :"+event.getValue()); }}Copy the code
// Define the producer
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
RingBuffer Indicates the next slot in the event queue
long sequence = ringBuffer.next();
Long data = null;
try {
//2. Fetch an empty event queue
LongEvent longEvent = ringBuffer.get(sequence);
data = byteBuffer.getLong(0);
//3. Get the data passed by the event queue
longEvent.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch blocke.printStackTrace(); }}finally {
System.out.println("Production is ready to send data.");
//4. Publish eventsringBuffer.publish(sequence); }}}Copy the code
public class DisruptorMain {
public static void main(String[] args) {
// 1. Create a cacheable thread that provides the thread to start Consumer event processing
ExecutorService executor = Executors.newCachedThreadPool();
// 2. Create factory
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3. Create a ringBuffer size
int ringBufferSize = 1024 * 1024; // ringBufferSize must be 2 to the N
// create Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5. Connect to the consumer method
disruptor.handleEventsWith(new LongEventHandler());
/ / 6. Start
disruptor.start();
// 7. Create a RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// create a producer
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9. Specify the buffer size
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
Disable Disruptor and Executordisruptor.shutdown(); executor.shutdown(); }}Copy the code
Core Design Principles
Disruptor addresses slow queue speeds by:
“Ring array structure:”
To avoid garbage collection, use arrays instead of linked lists. Also, arrays are more processor-friendly to the cache mechanism
❝
Reason: THE CPU cache is made up of many cache rows. Each cache line is usually 64 bytes, and it effectively refers to an address in main memory. A Java variable of type long is 8 bytes, so you can store up to 8 long variables in a cache line. Each time the CPU pulls data from main memory, adjacent data is stored in the same cache row. When you access a long array, if one of the values in the array is loaded into the cache, it automatically loads the other seven. So you can go through the array very quickly.
❞
“Element location:”
Array length 2^n, through bit operation, speed up the positioning. Subscripts take the form of increments. Don’t worry about index overflow. Index is of type long, and even 1 million QPS would take 300,000 years to run out.
“Lock free design:”
Each producer or consumer thread requests the location of an element in the array that it can operate on, and then writes or reads data directly from that location. The entire process uses the atomic variable CAS to ensure thread-safe operations
The data structure
The framework uses RingBuffer, a customizable array of rings, as the queue’s data structure.
In addition to the number group, there is a sequence that points to the next available element for use by producers and consumers.
The schematic diagram is as follows:
Sequence
Disruptor manages the data (events) exchanged through it through an ascending ordinal, and processing of the data (events) is always ascending along the ordinal.
“What are the advantages of array + sequence number design?”
As a review of HashMap, the time complexity of storing and retrieving elements in the array is only O(1) when the index subscript is known, and the index can be calculated by modulo the sequence number and the length of the array: index=sequence % table.length. Table. Length must be a power of 2.
Data writing process
The process of writing data in a single thread:
- Apply to write m elements;
- If m elements can be entered, the maximum sequence number is returned. Here the main determinant is whether unread elements will be overwritten;
- If the return is correct, the producer starts writing the element.
Usage scenarios
Disruptor has been tested to provide significantly better latency and throughput than ArrayBlockingQueue, so if you’re experiencing performance bottlenecks using ArrayBlockingQueue, consider using Disruptor instead.
Reference: github.com/LMAX-Exchan…
Of course, Disruptor performance is not guaranteed, so use is subject to testing.
The most common scenario for Disruptor is the producer-consumer scenario, which is the “one producer, many consumers” scenario and requires sequential processing.
For example, we read data sequentially from MySQL’s BigLog file and write it to ElasticSearch. In this scenario, BigLog requires one producer per file, and that one is a producer. Writing to ElasticSearch is strictly sequential, otherwise it will cause problems, so the usual multi-consumer threads can’t solve this problem, and if you lock it, performance will suffer
Reference:
Tech.meituan.com/2016/11/18/…
Github.com/LMAX-Exchan…