Autumn colors.jpg
A Disruptor.
Disruptor is a high-performance asynchronous processing framework.
Disruptor is a key component of LMAX’s online trading platform. LMAX uses the framework to process orders at a speed of 6 million TPS, delivering significant performance improvements across a wide range of applications other than the financial sector. Disruptor is not so much a framework as a design approach to dramatically improve performance (TPS) for applications with “concurrency, buffers, producer-consumer model, and transaction processing” elements.
Practice 2.
NetDiscovery is a crawler framework based on vert. x, RxJava 2 and other frameworks.
The default message Queue of NetDiscovery uses the JDK’s ConcurrentLinkedQueue. Since each component of the crawler framework can be replaced, the following crawler Queue is based on Disruptor.
2.1 Event Encapsulation
Encapsulate the crawler’s request as a RequestEvent, which is transmitted in the Disruptor.
import com.cv4j.netdiscovery.core.domain.Request; import lombok.Data; /** * Created by tony on 2018/9/1. */ @Data public class RequestEvent { private Request request; public String toString() { return request.toString(); }}Copy the code
2.2 Release Event
Write event publishing, get the serial number of the next writable event from RingBuffer, set the request to be requested by the crawler to RequestEvent, and finally submit the event to RingBuffer.
import com.cv4j.netdiscovery.core.domain.Request; import com.lmax.disruptor.RingBuffer; import java.util.concurrent.atomic.AtomicInteger; /** * Created by tony on 2018/9/2. */ public class Producer { private final RingBuffer<RequestEvent> ringBuffer; private AtomicInteger count = new AtomicInteger(0); Public Producer(RingBuffer<RequestEvent> RingBuffer) {this. RingBuffer = RingBuffer; } public void pushData(Request request){ long sequence = ringBuffer.next(); try{ RequestEvent event = ringBuffer.get(sequence); event.setRequest(request); }finally { ringBuffer.publish(sequence); count.incrementAndGet(); Public int getCount() {return count.get(); }}Copy the code
2.3 Consumption Events
After RequestEvent sets request, the consumer needs to handle specific events. The following Consumer is simply the thread name that records the Consumer and the request. The actual “consumption” again requires retrieving the request from the poll() of DisruptorQueue and “consuming” it in the Spider.
import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicInteger; /** * Created by tony on 2018/9/2. */ @Slf4j public class Consumer implements WorkHandler<RequestEvent> { @Override public void onEvent(RequestEvent requestEvent) throws Exception { log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value=" + requestEvent.toString()); }}Copy the code
2.4 DisruptorQueue implementation
Disruptor supports single-producer, single-consumer, multi-producer, multi-consumer, and groupings.
Adopt multiple producers, multiple consumers in NetDiscovery.
When RingBuffer is created, ProducerType uses the MULTI type to represent multiple producers. The RingBuffer was created using the YieldingWaitStrategy. The YieldingWaitStrategy is a WaitStrategy, and different Waitstrategies have different performance.
The YieldingWaitStrategy is the best for low-latency systems. This policy is recommended for scenarios that require high performance and the number of event processing lines is smaller than the number of CPU logical cores. For example, the CPU enables hyperthreading.
ringBuffer = RingBuffer.create(ProducerType.MULTI,
new EventFactory<RequestEvent>() {
@Override
public RequestEvent newInstance() {
return new RequestEvent();
}
},
ringBufferSize ,
new YieldingWaitStrategy());
Copy the code
EventProcessor is used to handle events within Disruptor.
The implementation classes of EventProcessor include: BatchEventProcessor for batch processing of events by a single thread and WorkProcessor for multi-thread processing of events.
WorkerPool manages a set of workProcessors. After creating ringBuffer, create workerPool:
SequenceBarrier barriers = ringBuffer.newBarrier();
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer();
}
workerPool = new WorkerPool<RequestEvent>(ringBuffer,
barriers,
new EventExceptionHandler(),
consumers);
Copy the code
Start the workerPool:
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(Executors.newFixedThreadPool(threadNum));
Copy the code
Finally, here’s the complete code for DisruptorQueue:
import com.cv4j.netdiscovery.core.domain.Request; import com.cv4j.netdiscovery.core.queue.AbstractQueue; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.ProducerType; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Created by tony on 2018/9/1. */ @Slf4j public class DisruptorQueue extends AbstractQueue { private RingBuffer<RequestEvent> ringBuffer; private Consumer[] consumers = null; private Producer producer = null; private WorkerPool<RequestEvent> workerPool = null; private int ringBufferSize = 1024*1024; Private AtomicInteger consumerCount = new AtomicInteger(0); private static final int CONSUME_NUM = 2; private static final int THREAD_NUM = 4; public DisruptorQueue() { this(CONSUME_NUM,THREAD_NUM); } public DisruptorQueue(int consumerNum,int threadNum) { consumers = new Consumer[consumerNum]; RingBuffer = ringBuffer. create(ProducerType.MULTI, new EventFactory<RequestEvent>() { @Override public RequestEvent newInstance() { return new RequestEvent(); } }, ringBufferSize , new YieldingWaitStrategy()); SequenceBarrier barriers = ringBuffer.newBarrier(); for (int i = 0; i < consumers.length; i++) { consumers[i] = new Consumer(); } workerPool = new WorkerPool<RequestEvent>(ringBuffer, barriers, new EventExceptionHandler(), consumers); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(Executors.newFixedThreadPool(threadNum)); producer = new Producer(ringBuffer); } @Override protected void pushWhenNoDuplicate(Request request) { producer.pushData(request); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public Request poll(String spiderName) { Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest(); ringBuffer.next(); consumerCount.incrementAndGet(); return request; } @Override public int getLeftRequests(String spiderName) { return producer.getCount()-consumerCount.get(); } public int getTotalRequests(String spiderName) { return super.getTotalRequests(spiderName); } static class EventExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable ex, Long sequence, Object event) {log.debug("handleEventException: "+ ex); } public void handleOnStartException(Throwable ex) {log.debug("handleOnStartException: "+ ex); {} public void handleOnShutdownException Throwable (ex) enter the debug (" handleOnShutdownException: "+ ex); }}}Copy the code
Where pushWhenNoDuplicate() sends the request to ringBuffer. Poll () extracts the corresponding request from ringBuffer and is used for crawler to process network request and parsing request, etc.
Conclusion:
Crawler framework Github address: github.com/fengzhizi71…
The code above is a classic Disruptor multi-producer multi-consumer code and can also be used as a sample code.
Finally, the crawler framework is interface oriented programming, so it is convenient to replace any component.
Related articles in the series: From API to DSL — Using Kotlin features to further encapsulate the crawler framework using Kotlin Coroutines to simply transform the original crawler framework to build Selenium module for the crawler framework, DSL module (Kotlin implementation) based on vert. x and RxJava 2. Build a general crawler framework