Fastest single-machine MQ — Disruptor
Disruptor is the fastest standalone MQ I know of
Disruptor was developed by a foreign financial and stock exchange. In 2011, Disruptor was awarded the Duke Award for being the fastest single MQ with high performance, lockless CAS and high concurrency
How’s it going? Are you hooked? Here, let me introduce you to today’s Disruptor
Think of Disruptor as an in-memory efficient queue
Disruptor profile
- Lock-free (CAS), high concurrency, using ring Buffer, directly overwriting (without cleaning) old data, reducing GC frequency, realizing event-based producer-consumer model (observer mode)
- Why is it observer mode? Since the consumer is always watching to see if there is a message in the queue, as soon as a new message is generated, the consumer thread consumes it immediately
Ring queue (RingBuffer)
-
RingBuffer has an ordinal sequence that points to the next available element and is implemented in an array with no beginning or end Pointers
- Disruptor requires that you set the length to the NTH power of 2 to facilitate binary computing
First, it is implemented based on an array, which is faster to iterate over than a linked list. Second, it does not need to maintain a pointer to a list, and of course it does not have a pointer to a list
-
When all the positions are filled, when you put another one down, it will cover position zero
At this time there will be a small partner anxious, how can cover it, that my data is not lost?
Disruptor does not make it easy for Disruptor to override data. When it is required to override data, a policy is executed. Disruptor provides a variety of policies, including common ones
- BlockingWaitStrategy, the common and default wait strategy, does not overwrite the queue when it is full, but blocks outside and waits
- SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy SleepingWaitStrategy
- The YieldingWaitStrategy is a loop that waits for the sequence to increase to an appropriate value and calls Thread.yieid () to allow other ready threads to execute
Disruptor development steps
- Define events – the elements in the queue that need to be processed
- Define an Event factory to populate the queue
- Define EventHandler(consumer) to handle elements in the container
Public class LongEvent{private long Value; private String name; @Override public String toString() { return "LongEvent{" + "value=" + value + ", name='" + name + '\'' + '}'; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getValue() { return value; } public void setValue(long value) { this.value = value; }}Copy the code
Public class LongEventFactory implements EventFactory<LongEvent> {@override public LongEvent newInstance() { return new LongEvent(); }}Copy the code
Public class LongEventHandler implements EventHandler<LongEvent> {@override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getName()+"-----"+longEvent.getValue()); }}Copy the code
Public class LongEventProducer {private final RingBuffer<LongEvent> RingBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long val, String name) { long sequence = ringBuffer.next(); try { LongEvent event = ringBuffer.get(sequence); event.setValue(val); event.setName(name); } finally { ringBuffer.publish(sequence); }}}Copy the code
Public static void main(String[] args) {// New LongEventFactory factory = new LongEventFactory(); // Set the SIZE of the ring Buffer int SIZE = 1024; // New Disruptor, which takes the Size of the message (event) factory Disruptor<LongEvent> Longeventtor = new Disruptor<LongEvent>(Factory, size, Executors.defaultThreadFactory()); / / set how consumption producers output message (event) longEventDisruptor. HandleEventsWith (new LongEventHandler ()); Longeventdisruptor.start (); longEventDisruptor (); / / get Disruptor circular Buffer RingBuffer < LongEvent > RingBuffer = longEventDisruptor. GetRingBuffer (); // New message (event) producer LongEventProducer = new LongEventProducer(ringBuffer); For (long l = 0; l<100; //TODO calls producer.onData(l,"MingLog-"+l); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }} / / the message (event) released out longEventDisruptor shutdown (); }Copy the code
Why is Disruptor so fast?
- At the bottom are arrays, which loop faster than linked lists
- There are no leading and trailing Pointers, saving the time needed to maintain two Pointers
- The start() method is called, the Disruptor is initialized, and all events in the available space are initialized. (This method is pre-created and can be used to modify the original object each time without having to create a new object, thus reducing the frequency of GC.) Therefore, when next obtains the next available Event, it does not need to determine whether the Event is initialized, which reduces one step of judgment
- Disruptor’s Size is 2 to the NTH power to facilitate binary operations to determine where a message should be placed in the available space
Here concludes your Disruptor tutorial, so feel free to tweet or comment if you have anything you’d like to learn and I’ll do my best to accommodate you
Like, pay attention to a wave of good, qiu Ligao ~