A Disruptor.

Disruptor is a high-performance asynchronous processing framework.

Disruptor is a key component of LMAX’s online trading platform. LMAX uses the framework to process orders at a speed of 6 million TPS, delivering significant performance improvements across a wide range of applications other than the financial sector. Disruptor is not so much a framework as a design approach to dramatically improve performance (TPS) for applications with “concurrency, buffers, producer-consumer model, and transaction processing” elements.

Practice 2.

NetDiscovery is a crawler framework based on vert. x, RxJava 2 and other frameworks.

The default message Queue of NetDiscovery uses the JDK’s ConcurrentLinkedQueue. Since each component of the crawler framework can be replaced, the following crawler Queue is based on Disruptor.

2.1 Event Encapsulation

Encapsulate the crawler’s request as a RequestEvent, which is transmitted in the Disruptor.

import com.cv4j.netdiscovery.core.domain.Request;
import lombok.Data;

/** * Created by tony on 2018/9/1. */
@Data
public class RequestEvent {

    private Request request;

    public String toString(a) {

        returnrequest.toString(); }}Copy the code

2.2 Release Event

Write event publishing, get the serial number of the next writable event from RingBuffer, set the request to be requested by the crawler to RequestEvent, and finally submit the event to RingBuffer.

import com.cv4j.netdiscovery.core.domain.Request;
import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.atomic.AtomicInteger;

/** * Created by tony on 2018/9/2. */
public class Producer {

    private final RingBuffer<RequestEvent> ringBuffer;

    private AtomicInteger count = new AtomicInteger(0); / / counter

    public Producer(RingBuffer<RequestEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(Request request){
        long sequence = ringBuffer.next();

        try{
            RequestEvent event = ringBuffer.get(sequence);
            event.setRequest(request);
        }finally{ ringBuffer.publish(sequence); count.incrementAndGet(); }}/** * Number of requests sent to queue *@return* /
    public int getCount(a) {

        returncount.get(); }}Copy the code

2.3 Consumption Events

After RequestEvent sets request, the consumer needs to handle specific events. The following Consumer is simply the thread name that records the Consumer and the request. The actual “consumption” again requires retrieving the request from the poll() of DisruptorQueue and “consuming” it in the Spider.

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/** * Created by tony on 2018/9/2. */
@Slf4j
public class Consumer implements WorkHandler<RequestEvent> {

    @Override
    public void onEvent(RequestEvent requestEvent) throws Exception {

        log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value="+ requestEvent.toString()); }}Copy the code

2.4 DisruptorQueue implementation

Disruptor supports single-producer, single-consumer, multi-producer, multi-consumer, and groupings.

Adopt multiple producers, multiple consumers in NetDiscovery.

When RingBuffer is created, ProducerType uses the MULTI type to represent multiple producers. The RingBuffer was created using the YieldingWaitStrategy. The YieldingWaitStrategy is a WaitStrategy, and different Waitstrategies have different performance.

The YieldingWaitStrategy is the best for low-latency systems. This policy is recommended for scenarios that require high performance and the number of event processing lines is smaller than the number of CPU logical cores. For example, the CPU enables hyperthreading.

        ringBuffer = RingBuffer.create(ProducerType.MULTI,
                new EventFactory<RequestEvent>() {
                    @Override
                    public RequestEvent newInstance(a) {
                        return new RequestEvent();
                    }
                },
                ringBufferSize ,
                new YieldingWaitStrategy());
Copy the code

EventProcessor is used to handle events within Disruptor.

The implementation classes of EventProcessor include: BatchEventProcessor for batch processing of events by a single thread and WorkProcessor for multi-thread processing of events.

WorkerPool manages a set of workProcessors. After creating ringBuffer, create workerPool:

        SequenceBarrier barriers = ringBuffer.newBarrier();

        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Consumer();
        }

        workerPool = new WorkerPool<RequestEvent>(ringBuffer,
                        barriers,
                        new EventExceptionHandler(),
                        consumers);
Copy the code

Start the workerPool:

        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        workerPool.start(Executors.newFixedThreadPool(threadNum));
Copy the code

Finally, here’s the complete code for DisruptorQueue:

import com.cv4j.netdiscovery.core.domain.Request;
import com.cv4j.netdiscovery.core.queue.AbstractQueue;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/** * Created by tony on 2018/9/1. */
@Slf4j
public class DisruptorQueue extends AbstractQueue {

    private RingBuffer<RequestEvent> ringBuffer;

    private Consumer[] consumers = null;
    private Producer producer = null;
    private WorkerPool<RequestEvent> workerPool = null;
    private int ringBufferSize = 1024*1024; // RingBuffer size, must be 2 to the N

    private AtomicInteger consumerCount = new AtomicInteger(0);

    private static final int CONSUME_NUM = 2;
    private static final int THREAD_NUM = 4;

    public DisruptorQueue(a) {

        this(CONSUME_NUM,THREAD_NUM);
    }

    public DisruptorQueue(int consumerNum,int threadNum) {

        consumers = new Consumer[consumerNum];

        / / create the ringBuffer
        ringBuffer = RingBuffer.create(ProducerType.MULTI,
                new EventFactory<RequestEvent>() {
                    @Override
                    public RequestEvent newInstance(a) {
                        return new RequestEvent();
                    }
                },
                ringBufferSize ,
                new YieldingWaitStrategy());

        SequenceBarrier barriers = ringBuffer.newBarrier();

        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Consumer();
        }

        workerPool = new WorkerPool<RequestEvent>(ringBuffer,
                        barriers,
                        new EventExceptionHandler(),
                        consumers);

        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        workerPool.start(Executors.newFixedThreadPool(threadNum));

        producer = new Producer(ringBuffer);
    }

    @Override
    protected void pushWhenNoDuplicate(Request request) {

        producer.pushData(request);
        try {
            Thread.sleep(100);
        } catch(InterruptedException e) { e.printStackTrace(); }}@Override
    public Request poll(String spiderName) {

        Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest();
        ringBuffer.next();
        consumerCount.incrementAndGet();
        return request;
    }

    @Override
    public int getLeftRequests(String spiderName) {

        return producer.getCount()-consumerCount.get();
    }

    public int getTotalRequests(String spiderName) {

        return super.getTotalRequests(spiderName);
    }

    static class EventExceptionHandler implements ExceptionHandler {

        public void handleEventException(Throwable ex, long sequence, Object event) {

            log.debug("handleEventException:" + ex);
        }

        public void handleOnStartException(Throwable ex) {

            log.debug("handleOnStartException:" + ex);
        }

        public void handleOnShutdownException(Throwable ex) {

            log.debug("handleOnShutdownException:"+ ex); }}}Copy the code

Where pushWhenNoDuplicate() sends the request to ringBuffer. Poll () extracts the corresponding request from ringBuffer and is used for crawler to process network request and parsing request, etc.

Conclusion:

Crawler framework Github address: github.com/fengzhizi71…

The code above is a classic Disruptor multi-producer multi-consumer code and can also be used as a sample code.

Finally, the crawler framework is interface oriented programming, so it is convenient to replace any component.


Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.