Welcome to my GitHub
Github.com/zq2599/blog…
Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;
Disruptor Notes series of links
- Quick start
- Disruptor class analysis
- Basic operations for ring queues (not using Disruptor class)
- Event consumption knowledge summary
- Event consumption Actual combat
- Common scenarios
- Waiting for the strategy
- Knowledge supplement (Final)
This paper gives an overview of
- This article is the third in a series of disruptor Notes designed to code message production and consumption. Unlike One of disruptor Notes: Quick Start, this development does not use the Disruptor class, and the operations associated with the Disruptor Ring Buffer are done in code.
- This move away from the Disruptor class to operate the Ring Buffer may not be appropriate in a production environment, but it has been an effective learning tool and will make you more comfortable developing, commissioning, and optimizing your Disruptor future.
- Simple message production and consumption can no longer meet our learning enthusiasm, today’s actual combat to challenge the following three scenarios:
- 100 events, single consumer consumption;
- 100 events, three consumers, each consuming this 100 events alone;
- 100 events, three consumers sharing 100 events;
These reviews
For the purposes of this section, note 2: Disruptor Class Analysis has been thoroughly researched and recommended to watch. Here’s a quick review of the core functionality of the disruptor class:
- Creating a ring queue (RingBuffer object)
- Create a SequenceBarrier object to receive consumable events in the ringBuffer
- Create a BatchEventProcessor that is responsible for consuming events
- Bind the exception handling class of the BatchEventProcessor object
- Call the ringBuffer addGatingSequences, the Sequence of consumers to ringBuffer
- Start a separate thread to execute the business logic that consumes the event
- The theoretical analysis is complete, and the coding begins;
Download the source code
- The full source code for this article can be downloaded at GitHub with the following address and link information (github.com/zq2599/blog…
The name of the | link | note |
---|---|---|
Project home page | Github.com/zq2599/blog… | The project’s home page on GitHub |
Git repository address (HTTPS) | Github.com/zq2599/blog… | The project source warehouse address, HTTPS protocol |
Git repository address (SSH) | [email protected]:zq2599/blog_demos.git | The project source warehouse address, SSH protocol |
- The git project has multiple folders. The source code for this tutorial is in the disruptor-Tutorials folder, as shown in the red box below:
- Disruptor-tutorials are parent projects with multiple modules (the module is low-level-operate, as shown in the red box below).
The development of
- Entering the coding phase, today’s task is to challenge the following three scenarios:
- 100 events, single consumer consumption;
- 100 events, three consumers, each consuming this 100 events alone;
- 100 events, three consumers sharing 100 events;
- Let’s build the project first, then write common code, such as event definition, event factory, etc., and finally develop each scenario;
- Disruptor-tutorials added a module named low-level-operate to the parent project disruptor-tutorials with build.gradle like this:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor'
testImplementation('org.springframework.boot:spring-boot-starter-test')}Copy the code
- Then there is the SpringBoot boot class:
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class LowLevelOperateApplication {
public static void main(String[] args) { SpringApplication.run(LowLevelOperateApplication.class, args); }}Copy the code
- Event class, this is the definition of an event:
package com.bolingcavalry.service;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class StringEvent {
private String value;
}
Copy the code
- Event factory, which defines how to create event objects in memory:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventFactory;
public class StringEventFactory implements EventFactory<StringEvent> {
@Override
public StringEvent newInstance(a) {
return newStringEvent(); }}Copy the code
- Disruptor event Production class that defines how to convert business logic events into disruptor events and publish them to a circular queue for consumption:
package com.bolingcavalry.service;
import com.lmax.disruptor.RingBuffer;
public class StringEventProducer {
// A circular queue for storing data
private final RingBuffer<StringEvent> ringBuffer;
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String content) {
// ringBuffer is a queue, and its next method returns the position after the last record, which is available
long sequence = ringBuffer.next();
try {
// The event fetched from the sequence position is a null event
StringEvent stringEvent = ringBuffer.get(sequence);
// Empty event Adds service information
stringEvent.setValue(content);
} finally {
/ / releaseringBuffer.publish(sequence); }}}Copy the code
- Event processing class, the specific business processing logic after receiving the event:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {
public StringEventHandler(Consumer
consumer) {
this.consumer = consumer;
}
// The Consumer implementation class can be passed in and the Consumer accept method will be executed every time a message is processed
privateConsumer<? > consumer;@Override
public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// There is a delay of 100ms, which simulates the logic of consuming events
Thread.sleep(100);
// If consumer is passed in externally, the accept method is executed
if (null! =consumer) { consumer.accept(null); }}}Copy the code
- Define an interface that calls the methods of the interface to produce messages, and place several constants in it for later use:
package com.bolingcavalry.service;
public interface LowLevelOperateService {
/** * Number of consumers */
int CONSUMER_NUM = 3;
/** * Ring buffer size */
int BUFFER_SIZE = 16;
/** * Publish an event *@param value
* @return* /
void publish(String value);
/** * returns the total number of tasks processed *@return* /
long eventCount(a);
}
Copy the code
- That’s the common code. Let’s implement the three scenarios one by one.
100 events, single consumer consumption
- This is the simplest function, implementing the function of Posting messages and individual consumer consumption. The code is as follows, with a few points to note later:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/** * Total number of messages */
private final AtomicLong eventCount = new AtomicLong();
private ExecutorService executors;
@PostConstruct
private void init(a) {
// Prepare an anonymous class to pass to the event handler class for disruptor,
// Each time an event is processed, the total number of events that have been processed is printedConsumer<? > eventCountPrinter =new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count); }};// Create a ring queue instance
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
// Prepare the thread pool
executors = Executors.newFixedThreadPool(1);
/ / create SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// Create a work class for event handling that executes StringEventHandler to handle events
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
sequenceBarrier,
new StringEventHandler(eventCountPrinter));
// Pass the consumer sequence to the ring queue
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// Take the event in a separate thread and consume it
executors.submit(batchEventProcessor);
/ / producer
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount(a) {
returneventCount.get(); }}Copy the code
- There are a few things to note about this code:
- Create your own instance of the ring queue RingBuffer
- Prepare your own thread pool, where threads are used to fetch and consume messages
- Create your own BatchEventProcessor instance and pass in the event handler class
- Create a sequenceBarrier from ringBuffer and pass it to the BatchEventProcessor instance
- Pass the sequence of BatchEventProcessor to ringBuffer to ensure that the production and consumption of ringBuffer will not be chaotic
- Start the thread pool, which means that the BatchEventProcessor instance constantly fetches and consumes events from ringBuffer in a separate thread.
- In order to verify the above code can work normally, I am here to write a unit test class, as shown below, the logic is simple, call OneConsumerServiceImpl. One hundred times, the publish method to produce one hundred events, Check that OneConsumerServiceImpl records a total of 100 consumption events:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest {
@Autowired
@Qualifier("oneConsumer")
LowLevelOperateService oneConsumer;
private static final int EVENT_COUNT = 100;
private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
for(int i=0; i<eventCount; i++) { log.info("publich {}", i);
service.publish(String.valueOf(i));
}
// Async consumption, so delay is required
Thread.sleep(10000);
// The total number of events consumed should equal the number of events published
assertEquals(expectEventCount, service.eventCount());
}
@Test
public void testOneConsumer(a) throws InterruptedException {
log.info("start testOneConsumerService");
testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
}
Copy the code
- Note that if you are executing unit tests by clicking the icon on IDEA directly, check the options in the red box below, otherwise compilation may fail:
- Execute the above unit test class, and the result looks like the following, with messages being produced and consumed as expected, and the consumption logic being executed in a separate thread:
- Move on to the next scenario;
100 events, three consumers, each consuming this 100 events alone
- This scenario also exists in Kafka, where the groups of three consumers are different, so that for each message, the two consumers each consume one;
- So 100 events, 3 consumers each independently consume those 100 events, 300 times;
- The code looks like this, with a few caveats mentioned later:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/** * Total number of messages */
private final AtomicLong eventCount = new AtomicLong();
/** * Produces an instance of BatchEventProcessor and starts a separate thread to start retrieving and consuming messages *@param executorService
*/
private void addProcessor(ExecutorService executorService) {
// Prepare an anonymous class to pass to the event handler class for disruptor,
// Each time an event is processed, the total number of events that have been processed is printedConsumer<? > eventCountPrinter =new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count); }}; BatchEventProcessor<StringEvent> batchEventProcessor =new BatchEventProcessor<>(
ringBuffer,
ringBuffer.newBarrier(),
new StringEventHandler(eventCountPrinter));
// Pass the current consumer sequence instance to ringBuffer
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// Start a separate thread to fetch and consume events
executorService.submit(batchEventProcessor);
}
@PostConstruct
private void init(a) {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
// Create multiple consumers and fetch and consume events in separate threads
for (int i=0; i<CONSUMER_NUM; i++) { addProcessor(executorService); }/ / producer
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount(a) {
returneventCount.get(); }}Copy the code
-
The above code is not much different from the OneConsumerServiceImpl, which creates multiple instances of BatchEventProcessor and commits them separately in the thread pool.
-
Validation method is still the unit test, in just LowLeverOperateServiceImplTest. Increase in Java code, Pay attention to the the third parameter is testLowLevelOperateService EVENT_COUNT * LowLevelOperateService CONSUMER_NUM, said the expected consumed message for 300:
@Autowired
@Qualifier("multiConsumer")
LowLevelOperateService multiConsumer;
@Test
public void testMultiConsumer(a) throws InterruptedException {
log.info("start testMultiConsumer");
testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
}
Copy the code
- Execute the unit test, as shown in the figure below, consuming 300 events and three consumers in the idle thread:
100 events, three consumers consuming 100 events
-
The final practice of this article is to publish 100 events and then have three consumers consume 100 of them (for example, A consumes 33, B consumes 33, and C consumes 34).
-
The BatchEventProcessor used above is for independent consumption, which is not suitable for multiple consumers to consume together. This scenario of multiple consumption and common consumption needs to be completed with the help of WorkerPool. The name is still very impressive: There are many workers in a pool, put the task into the pool, the workers each do part of the task, they work together to complete the task;
-
The consumer passing in the WorkerPool needs to implement the WorkHandler interface, so add a new implementation class:
package com.bolingcavalry.service;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> {
public StringWorkHandler(Consumer
consumer) {
this.consumer = consumer;
}
// The Consumer implementation class can be passed in and the Consumer accept method will be executed every time a message is processed
privateConsumer<? > consumer;@Override
public void onEvent(StringEvent event) throws Exception {
log.info("work handler event : {}", event);
// There is a delay of 100ms, which simulates the logic of consuming events
Thread.sleep(100);
// If consumer is passed in externally, the accept method is executed
if (null! =consumer) { consumer.accept(null); }}}Copy the code
- The new service class implements common consumption logic with a few caveats that will be mentioned later:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/** * Total number of messages */
private final AtomicLong eventCount = new AtomicLong();
@PostConstruct
private void init(a) {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];
// Create multiple instances of StringWorkHandler and place them in an array
for (int i=0; i < CONSUMER_NUM; i++) { handlers[i] =new StringWorkHandler(o -> {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
});
}
// Create an instance of WorkerPool and pass in an array of Instances of StringWorkHandler representing the number of common consumers
WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);
// This is a very important sentence. If you do not consume the same event twice, you will have to consume the same event twice
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executorService);
/ / producer
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount(a) {
returneventCount.get(); }}Copy the code
- There are two things to note in the above code:
-
Once the StringWorkHandler array is passed to the WorkerPool, each Instance of StringWorkHandler is put into a new instance of WorkProcessor, which implements the Runnable interface. When workerPool.start is executed, the WorkProcessor is committed to the thread pool.
-
Collaborative consumption compared with those of independent consumption in front of the biggest characteristic is to just call a ringBuffer. AddGatingSequences method, that is to say the three consumer share a sequence examples;
- Validation method is still the unit test, in just LowLeverOperateServiceImplTest. Increase in Java code, pay attention to the third parameter is EVENT_COUNT testWorkerPoolConsumer, said the expected consumed message for 100:
@Autowired
@Qualifier("workerPoolConsumer")
LowLevelOperateService workerPoolConsumer;
@Test
public void testWorkerPoolConsumer(a) throws InterruptedException {
log.info("start testWorkerPoolConsumer");
testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
}
Copy the code
- Execute the unit test as shown in the figure below, with three consumers consuming 100 events in total and three consumers on different threads:
- Here, you have completed three common scenarios of message production and consumption without using the Disruptor class. You have gained a good understanding of the underlying Disruptor implementation and will be able to use and optimize your Disruptor in the future.
You are not alone, Xinchen original accompany all the way
- Java series
- Spring series
- The Docker series
- Kubernetes series
- Database + middleware series
- The conversation series
Welcome to pay attention to the public number: programmer Xin Chen
Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…
Github.com/zq2599/blog…