0x00 Concept review

Disruptor consumption: Event: The Event transmitted within Disruptor. RingBuffer: Container for storing and updating events. EventHandler: User implementation interface that contains consumption processing logic and represents Disruptor as a consumer. EventProcessor: EventProcessor inherits the Runnable interface and contains the main loop that processes Disruptor events.

Multicast events: The biggest difference in behavior between queues and disruptors. An event in a queue can only be consumed by one consumer, whereas an event in a Disruptor is published to all consumers. It is especially suitable for independent parallel processing of the same data. Consumer dependency graph (consumption chain) : When the same event needs to be consumed by multiple consumers, there may be dependencies between consumers. For example, consumers A,B,C,B and C rely on A to execute first, but B and C can consume in parallel.

0x01 EventProcessor Interface Overview

OK, we officially start source code interpretation for Disruptor Consumers. Consumers of Disruptor rely on the EventProcessor loop to process available events. EventProcessor, as its name implies, is an EventProcessor (both handle and process can be translated as “processing”, but process focuses on machine processing, while handle focuses on manual processing, so handle represents the processing of user logic. This interface has two implementation classes, WorkProcessor and BatchEventProcessor, whose corresponding logical processing consumers are EventHandler and WorkHandler, respectively. Below is the UML class diagram for EventProcessor and the interface definitions for EventHandler and EventProcessor.

/** * Callback interface to be implemented for processing events as they become available in the {@link RingBuffer} * * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event. * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the Public interface EventHandler<T> {/** * Called when a publisher has published an event to the {@link RingBuffer} * * @param event published to the {@link RingBuffer} * @param sequence of the event being processed *  @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} * @throws Exception  if the EventHandler would like the exception handled further up the chain. */ void onEvent(T event, long sequence, boolean endOfBatch) throws Exception; }Copy the code

/** * EventProcessors waitFor events to become available for consumption from the {@link RingBuffer} * <p> * An The EventProcessor will generally be associated with a Thread for execution. */ public interface extends Runnable {/** * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}. * * @return reference to the {@link Sequence} for this {@link EventProcessor}  */ Sequence getSequence(); /** * Signal that this EventProcessor should stop when it has finished consuming at the next clean break. * It will call  {@link SequenceBarrier#alert()} to notify the thread to check status. */ void halt(); boolean isRunning(); }Copy the code

The EventProcessor interface inherits the Runnable interface and has two main implementations: BatchEventProcessor and WorkProcessor. When building a consumer using the Disruptor helper class, multiple Eventhandlers are passed in using the handleEventsWith method and multiple BatchEventProcessors are used internally to associate multiple thread executions. This situation is similar to the publish-subscribe model in JMS, where the same event is consumed by multiple consumers in parallel. Multiple operations can be triggered by the same event. And use the Disruptor handleEventsWithWorkerPool into multiple WorkHandler, internal use multiple WorkProcessor associated multiple threads to execute. This situation is similar to the point-to-point pattern of JMS, where the same event is consumed by one of a set of consumers. It is suitable for improving the parallel processing capability of consumers.

0x02 Consumer technology implementation

Let’s review two features of Disruptor consumers: the consumer dependency graph (the “consumer chain” below) and event multicast. Suppose we have four consumers A,B,C, and D, what form can they all take? From the numerous permutations and combinations, I have selected 4 groups of more representative forms of consumption chain.

  • In group 1, B, C and D can consume at the same time after consumer A’s consumption is completed;
  • In group 2, consumers A, B, C, and D consume in order;
  • In group 3, after consumers A and B consume sequentially, C and D consume simultaneously;
  • In group 4, after consumer A finishes consumption, B and C can consume at the same time, but D can only consume after both consumptions are completed.

The consumption chains labeled 1, 3, and 4 all use event multicast, so it can be seen that event multicast is a combination of the consumption chain. Notice that in each of the four combinations, each water is parallel and belongs to a consumer group. These are the relatively simple components of the consumer chain, which can be more complex in practice. So how is the consumer chain implemented inside Disruptor? We can think about it for a second. If you want to form a consumption chain of independent consumers, then the consumer (group) behind it must know the processing of the consumer (group) in front of it, otherwise sequential consumption can not be done. At the same time, the consumer also needs to know the location of the producer to determine if there are any available events. When we analyzed the producer code earlier, we said that in order not to cover the incomplete consumption events, the producer must know the processing of the slowest consumer. Only by doing this can we have the ability to control consumers and form the consumption chain. Let’s look at the implementation in Disruptor.

0x02.1 Use BatchEventProcessor to process a single threaded batch event

When using BatchEventProcessor, you can get an EventHandlerGroup by using the Disruptor#handleEventsWith method, Using the EventHandlerGroup’s AND and then methods, a complex consumer chain can be constructed. The EventHandlerGroup represents a group of event consumers that internally hold an instance of the Disruptor class Disruptor, most of which is implemented by calling Disruptor as part of the Disruptor helper class.

// EventHandlerGroup.java
public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
    return handleEventsWith(handlers);
}

public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return disruptor.createEventProcessors(sequences, handlers);
}
Copy the code

// Disruptor.java public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } // when called by EventHandlerGroup, barrierSequences is the sequence of the EventHandlerGroup instance, that is, the sequence of the previous EventHandlerGroup, as the gating for the current event processing, // If handleEventsWith is called for the first time, BarrierSequences is an empty array EventHandlerGroup<T> **createEventProcessors**(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); Final Sequence[] processorSequences = new Sequence[eventhandlers.length]; // Final Sequence[] processorSequences = new Sequence[eventhandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; Final BatchEventProcessor<T> BatchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler ! = null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } // After each addition of the event handler, update the gating sequence for subsequent addition of the call chain. (The so-called gating refers to the consumption of the subsequent consumption chain, which cannot exceed the previous one.) updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); // barrierSequences are sequences of the previous set of event handlers (empty array if this is the first time), Group cannot exceed the value of group Sequence / / processorSequences is the Sequence of this group to set the event handler private void updateGatingSequencesForNextInChain (final Sequence [] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { ringBuffer.addGatingSequences(processorSequences); // Add this Sequence to gatingSequences in the Sequencer for (final Sequence barrierSequence: BarrierSequences) // gatingSequences from the Sequencer, GatingSequences kept consumer group of consumers at the end of the chain sequence {ringBuffer. RemoveGatingSequence (barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); // Untag a group of consumers as the end of the consumer chain}}Copy the code

As you can see, the logic for building the consumer chain using BatchEventProcessor is in the createEventProcessors method. ConsumerRepository is a class that stores consumer relationships, such as the EventHandler reference for EventProcessorInfo, Sequence for ConsumerInfo, etc. Because references are used for keys, data structures use IdentityHashMap. The biggest difference between IdentityHashMap and HashMap is that it uses == instead of equals to compare keys. The createEventProcessors method takes two arguments, barrierSequences representing an array of barrierSequences for the current consumer group, or an empty array if the current consumer group is the first. Otherwise, barrierSequences is the sequence array of the previous consumer group. Another argument to the createEventProcessors method, eventHandlers, is the EventHandler array that represents the event consumption logic. Disruptor creates a corresponding BatchEventProcessor for each EventHandler implementation class. The following three construction parameters need to be passed in to build BatchEventProcessor: dataProvider is a data storage structure such as RingBuffer; SequenceBarrier is used to track producer cursors and coordinate data processing. EventHandler is a user-implemented eventHandler, that is, the actual consumer. Note that Disruptor does not create a new SequenceBarrier for each BatchEventProcessor; rather, each consumer group shares a SequenceBarrier. The BatchEventProcessor is defined as follows. As to why it is called BatchEventProcessor, you can see that in the run() method the availableSequence of each waitFor fetch is the maximum available, and then recycle the data. In this way, when the consumer has instantaneous jitter, resulting in a temporary lag behind the producer, in the next cycle, batch processing of all lagging events.

/** * Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer} * and delegating the available events to an {@link EventHandler}. * <p> * If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread * is started and just before the thread is shutdown. * * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event. * * Each EventHandler corresponds to an EventProcessor executor. BatchEventProcessor can obtain the highest available sequence number for each large loop. EventHandler */ public Final Class BatchEventProcessor<T> implements EventProcessor {private Final AtomicBoolean running = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler(); private final DataProvider<T> dataProvider; Private final SequenceBarrier SequenceBarrier private final SequenceBarrier private final SequenceBarrier / / by default ProcessingSequenceBarrier private final EventHandler <? super T> eventHandler; // The user-defined EventHandler implementation of this EventProcessor private Final Sequence Sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); Private final TimeoutHandler TimeoutHandler; private final BatchStartAware batchStartAware; // After each loop gets a batch of available events, Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. * * @param dataProvider to which events are published. * @param sequenceBarrier on which it is  waiting. * @param eventHandler is the delegate to which events are dispatched. */ public BatchEventProcessor( final DataProvider<T> dataProvider, final SequenceBarrier sequenceBarrier, final EventHandler<? super T> eventHandler) { this.dataProvider = dataProvider; this.sequenceBarrier = sequenceBarrier; this.eventHandler = eventHandler; if (eventHandler instanceof SequenceReportingEventHandler) { ((SequenceReportingEventHandler<? >) eventHandler).setSequenceCallback(sequence); } batchStartAware = (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null; timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null; } / /... ** * It is ok to have another thread rerun this method after a halt(). ** @throws IllegalStateException if this  object instance is already running in a thread */ @Override public void run() { if (! running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; long nextSequence = sequence.get() + 1L; Try {while (true) {try {// availableSequence returns the maximum value available final long availableSequence = sequenceBarrier.waitFor(nextSequence); If (batchStartAware! = null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); While (nextSequence <= availableSequence) {event = dataprovider.get (nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // After the eventHandler process is complete, update the current sequence number. set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (! running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); }}}Copy the code

0x02.2 SequenceBarrier -SequenceBarrier available to consumers

Let’s focus on the SequenceBarrier. The main function of a SequenceBarrier is to coordinate the acquisition of the maximum sequence number that a consumer can process, and it holds the sequence of the producer and its dependent consumer. Its interface is defined as follows.

Public interface SequenceBarrier {/** * Wait for the given sequence to be available for consumption @param sequence to wait for * @return the sequence up to which is available * @throws AlertException if a status change has occurred for the Disruptor * @throws InterruptedException if the thread needs awaking on a condition variable. * @throws TimeoutException * */ long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;  /** * Get the current cursor value that can be read.<br> ** return value of the cursor for entries that have been published. * */ long getCursor(); /** * The current alert status for The barrier.<br> ** @return true if in alert otherwise false. */ boolean isAlerted(); /** * Alert the {@link EventProcessor}s of a status change and stay in this status until cleared. This method is called when EventProcessor# HALT () is called. */ void alert(); /** * clearAlert(); /** * clearAlert(); /** * Check if an alert has been raised and throw an {@link AlertException} if it has. * @throws AlertException if alert has been raised. */ void checkAlert() throws AlertException; }Copy the code

The SequenceBarrier instance reference is held by the EventProcessor to waitFor and obtain available consumption events, primarily in the waitFor method. To do this, three things are required:

  1. Know the position of the producer.
  2. Because Disruptor supports consumer chains between different consumer groups, ensure that subsequent consumer groups cannot be processed until all consumers in the previous consumer group have processed.
  3. There are no events to consume, and you need to use some sort of wait strategy to wait while waiting for available consumption.

Look at the SequenceBarrier implementation class ProcessingSequenceBarrier code is how to realize the waitFor method.

final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; Private final Sequence dependentSequence; private final Sequence dependentSequence; // The sequence number of the dependent upper group of consumers, or cursorSequence if the current group is the first, Otherwise use FixedSequenceGroup to encapsulate the upper group consumer sequence private volatile Boolean alerted = false; // when halt is triggered, mark alerted as true private final Sequence cursorSequence; // AbstractSequencer cursor reference, record the latest location published by the current publisher private final Sequencer Sequencer; / / MultiProducerSequencer or SingleProducerSequencer public ProcessingSequenceBarrier (final Sequencer Sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; If (0 = = dependentSequences. Length) / / dependent on a set of sequence length, the first is 0 {dependentSequence = cursorSequence; $dependentSequence = new FixedSequenceGroup($dependentSequences); $dependentSequence = new FixedSequenceGroup($dependentSequences); } } @Override public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {// Check whether the service is stopped checkAlert(); CursorSequence = cursorSequence +1; cursorSequence = cursorSequence  long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } / / return the published sequence of the highest value, for each serial number to check the return sequencer. GetHighestPublishedSequence (sequence, availableSequence); } / /... }Copy the code

0x02.3 What posture should I use to wait for available events -WaitStrategy

It appears that the actual wait operation is still done at WaitStrategy#waitFor.

// WaitStrategy.java /** * Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}. <br> Public interface WaitStrategy {/** * Wait for the given sequence to be available. It is possible for this method to return a value * less than the sequence number supplied depending on the implementation of the WaitStrategy. A common * use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications * about message becoming available should remember to handle this case. The {@link BatchEventProcessor} explicitly * handles this case and will signal a timeout if required. * * @param sequence to be waited on. * @param cursor the main sequence from ringbuffer. Wait/notify strategies will * need this as it's the only sequence that is also notified upon update. Producer cursor * @param dependentSequence on which to wait. The dependent sequence is typically a FixedSequenceGroup encapsulation of the previous consumer group sequence. If the consumer is the first group, it is cursor. * @param barrier the processor is waiting on. * @return the sequence that is available which may be greater than the requested sequence. * @throws AlertException if the status of the Disruptor has changed. * @throws InterruptedException if the thread is interrupted. * @throws TimeoutException */ long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException; /** * Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. <br> * Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. The waiting EventProcessor is notified. This logic is only included when the locking mechanism is used. */ void signalAllWhenBlocking(); } Among all kinds of waiting strategies, we choose blocking strategy to study. public final class BlockingWaitStrategy implements WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition processorNotifyCondition = lock.newCondition(); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; If (cursorsequence. get() < sequence) // The current cursor is less than the given sequence number, that is, there is no event {lock.lock(); Try {while (cursorsequence.get () < sequence) // wait for the given sequence to be greater than the producer cursorSequence {barrier. // Loop to wait, publish in Sequencer to wake up; Waiting for consumption also wakes up periodically in the loop. // The reason for the loop is to check the alert status. Failure to do so will result in disabling Disruptor. processorNotifyCondition.await(); } } finally { lock.unlock(); }} // Wait when the sequence number given is greater than the slowest consumer in the previous consumer group (if the current consumer is the first group, compare with the producer cursor sequence number). Do not overconsume an event in which a consumer group has not finished consuming. // Why is there no lock here? If you think about this scenario, the code runs so far that the producer is guaranteed a new event, and if it goes into a loop, the previous group of consumers has not finished consuming. // Our customers usually finish their tasks quickly, so Busy Spin is used to wait for the previous group of consumers to finish their purchases. while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } } @Override public String toString() { return "BlockingWaitStrategy{" + "processorNotifyCondition=" + processorNotifyCondition + '}'; }}Copy the code

The blocking wait strategy uses Lock+Condition to wait for producers to produce available events, and Busy Spin to wait for possible unconsumptions from the previous consumer group. A hint here is that when building low-latency systems, try not to use locks because of their performance cost. If you must use locks, keep the lock granularity to a minimum. In addition, consumers will loop through barrier. CheckAlert () while waiting for available consumption events, and then call the lock conditional wait, waiting for available consumption events. There are three places to wake up a waiting consumer thread. The two are in the Sequencer implementation class. One is that there are available events published to inform the consuming thread to continue consuming. Second, when next() is called to obtain available RingBuffer slots, it is found that RingBuffer is full (the producer speed is higher than the consumer, resulting in no available place for the producer to publish events), which will wake up the consumer thread. Resignal any waiting Threads when trying to publish to a full ring buffer was added in version 3.3.5. At first I couldn’t figure out why I kept waking up the consumer thread when the buffer was full until I saw the issue. A deadlock occurred when using Disruptor in log4j2. To prevent consumers from not being notified when publishing events, consumers will be notified when producers attempt to publish data to a full buffer. This bug was eventually claimed by Log4j and is not related to Disruptor. Disruptor is hereby re-notified for additional insurance purposes.

//* producerSequencer. Java // code in next(n) // Due to slow consumers, there is no useful pit, only when consumers consume, move forward, can break out of the loop // Since the outer judgment uses the cached consumer sequence minimum, Using real consumer sequences, While (wrapPoint > (minSequence = util.getMinimumSequence (gatingSequences, NextValue))) {/ / wake up waiting for the customer, under normal circumstances and meaningless, just in order to avoid rare cases of unknown causes abnormal release the lock mechanism, fails to notify to the consumer waitStrategy. SignalAllWhenBlocking (); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? }Copy the code

There is also a wakeup call that occurs when a Disruptor is closed. The consumer will process the current batch of data (not all of the RingBuffer data, but all of the unprocessed data below the maximum available number retrieved by this cycle) before closing the Disruptor, and will wake up and terminate if the consumer thread is currently in a waiting state. So much for BatchEventProcessor.

0x02.4 Multithreading events using the WorkProcessor

Let’s talk about WorkHandler+WorkProcessor. As mentioned above, using EventHandler+BatchEventProcessor is similar to a JMS publish-subscribe, where the same event is consumed in parallel by eventhandlers of different threads. So what if a single thread doesn’t have enough capacity and you want to multithread different events on the same topic? This approach is similar to the point-to-point model of JMS, where multiple consumers can listen on the same queue and whoever gets it first gets it. Use WorkHandler+WorkProcessor in Disruptor to do this. When you need to use this model, can be set up in the Disruptor, consumers through the use of handleEventsWithWorkerPool and thenHandleEventsWithWorkerPool set consumer chain.

disruptor
    .handleEventsWithWorkerPool(
      new WorkHandler[]{
          journalHandler,
          journalHandler,
          journalHandler
      }
    )
    .thenHandleEventsWithWorkerPool(resultHandler);
Copy the code

Take a look at the relevant source code.

// Disruptor public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); final Sequence[] workerSequences = workerPool.getWorkerSequences(); updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new EventHandlerGroup<T>(this, consumerRepository, workerSequences); Public WorkerPool(final RingBuffer<T> RingBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; I ++) {workProcessors[I] = new WorkProcessor<T>(I ++) {workProcessors[I] = new WorkProcessor<T> workHandlers[i], exceptionHandler, workSequence); }}Copy the code

The biggest difference when using a thread pool for event handling compared to single-threaded handling is the addition of a WorkerPool. WorkerPool is used to manage a group of workprocessors. Its properties and methods are as follows.

WorkProcessor is similar to BatchEventProcessor in principle, except that workSequence is used to store processing sequences shared by the same group. When updating the workSequence, multi-threading is involved, so CAS is used for the update. The run() method of WorkProcessor is as follows.

@Override public void run() { if (! running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler If (processedSequence) // Indicates the processing of the nextSequence (normal or abnormal processing is not distinguished). Only after processing, can apply for the next serial number. { processedSequence = false; Do {// Multiple consuming threads in the same group may compete for a sequence number. Use CAS to avoid locking. // The same group uses a workSequence. The WorkProcessor continuously applies for the next available sequence number and consumes the workSequence only after it is set successfully. nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (! workSequence.compareAndSet(nextSequence - 1L, nextSequence)); If (cachedAvailableSequence >= nextSequence) {event = ringbuffer. get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else // Update cached available sequences. This cachedAvailableSequence is only available within the WorkProcessor instance. Caching may be different from instance to instance. CachedAvailableSequence = sequenceBarrier. WaitFor (nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (! running.get()) { break; } } catch (final Throwable ex) { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); }Copy the code

The code logic is similar to that of the BatchEventProcessor, so I won’t repeat it. Note also that Disruptor represents a consumer group via the EventHandlerGroup, which represents a consumer group at one of the horizontal levels in the previous four charts. This eliminates the need for different groups of consumers to care about their own implementations, allowing for a more complex and flexible consumer chain that relies on diagrams.

0x03 Consumer Summary

Disruptor Consumer Internal Profile this article explores Disruptor consumer internal profile implementation, focusing on the principles of BatchEventProcessor and WorkProcess consumer code. At the same time, it omitted time-out notice, start and end notice, exception control and other content, which is not unimportant, but just as concise as possible, to achieve the purpose of throwing out a piece of information. BatchEventProcessor is mainly used to process single-threaded parallel tasks. Different consumers in the same consumer group will receive the same event. After all the events are processed, it moves to the next consumer group for processing (like Phaser, CyclicBarrier, or CountDownLatch in JUC). The WorkProcessor manages multiple workprocessors through the WorkerPool to process events in multiple threads. Multiple workprocessors in the same consumer group do not process the same event. By selecting different WaitStragegy implementations, you can control the consumer’s wait strategy when no event processing is available.

Author: coder_jerry links: www.jianshu.com/p/f4021e814… The copyright of the book belongs to the author. Commercial reprint please contact the author for authorization, non-commercial reprint please indicate the source.