Welcome to my GitHub
Github.com/zq2599/blog…
Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;
Disruptor Notes series of links
- Quick start
- Disruptor class analysis
- Basic operations for ring queues (not using Disruptor class)
- Event consumption knowledge summary
- Event consumption Actual combat
- Common scenarios
- Waiting for the strategy
- Knowledge supplement (Final)
This paper gives an overview of
- This article, the sixth in the Disruptor Notes series, provides a summary of common consumption patterns that you can use if needed in your daily development.
- The following are common patterns:
- Multiple consumers independent consumption, the previous implementation, this skip
- Multiple consumers common consumption, the previous has been achieved, this skip
- There are both independent consumption and common consumption, which has already been realized and skipped in this article
- Multiple producers and multiple independent consumers:
- C1 and C2 are consumed independently, while C3 depends on C1 and C2
- C1 is independent, C2 and C3 are also independent, but C1 is dependent, C4 is dependent on C2 and C3:
- C1 and C2 are independent consumers, C3 and C4 are also independent consumers, but C3 and C4 both depend on C1 and C2, and then C5 depends on C3 and C4:
- C1 and C2 co-consume, as do C3 and C4, but both C3 and C4 depend on C1 and C2, and then C5 depends on C3 and C4:
- C1 and C2 are consumed together, C3 and C4 are consumed independently, but both C3 and C4 depend on C1 and C2, and then C5 depends on C3 and C4:
- C1 and C2 are consumed independently, C3 and C4 are consumed together, but both C3 and C4 depend on C1 and C2, and then C5 depends on C3 and C4:
About this code
- To save trouble, we will not create a new project this time, but will use the previous law-mode module directly, so the following classes are used directly without rewriting the code:
- Event definition: OrderEvent
- EventFactory: OrderEventFactory
- EventProducer: OrderEventProducer
- Event consumers for standalone consumption scenarios: MailEventHandler
- Event consumers for common consumption scenarios: MailWorkHandler
Download the source code
- The full source code for this article can be downloaded at GitHub with the following address and link information (github.com/zq2599/blog…
The name of the | link | note |
---|---|---|
Project home page | Github.com/zq2599/blog… | The project’s home page on GitHub |
Git repository address (HTTPS) | Github.com/zq2599/blog… | The project source warehouse address, HTTPS protocol |
Git repository address (SSH) | [email protected]:zq2599/blog_demos.git | The project source warehouse address, SSH protocol |
- The git project has multiple folders. The source code for this tutorial is in the disruptor-Tutorials folder, as shown in the red box below:
- Disruptor-tutorials are parent projects with multiple modules, as shown in the red box below:
Multiple producers and multiple independent consumers
Let’s implement the following logic:
- In this article, to make our law-mode code support multiple producers, we need to make the following two changes to the abstract parent class of the functional business:
-
The init method was originally private, but has been changed to protected so that subclasses can replicate it.
-
Add a method called publishWithProducer2 that only throws an exception. For this to work, subclasses need to implement it themselves:
public void publishWithProducer2(String value) throws Exception {
throw new Exception("The parent class does not implement this method, please override this method in the child class and call again.");
}
Copy the code
- In order to realize the function of producers, new MultiProducerServiceImpl. Java, there are several caveats will mention later:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service("multiProducerService")
public class MultiProducerServiceImpl extends ConsumeModeService {
/** * The second producer */
@Setter
protected OrderEventProducer producer2;
@PostConstruct
@Override
protected void init(a) {
/ / instantiate
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"),
// The production type is multi-producer
ProducerType.MULTI,
// BlockingWaitStrategy is the default wait strategy
new BlockingWaitStrategy());
// Leave it to subclasses to implement the specific event-consuming logic
disruptorOperate();
/ / start
disruptor.start();
// The first producer
setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
// The second producer
setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));
}
@Override
protected void disruptorOperate(a) {
// No. 1 consumer
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
// Consumer No. 2
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
/ / call handleEventsWithWorkerPool, said multiple consumers to create common consumption patterns of consumption
disruptor.handleEventsWith(c1, c2);
}
@Override
public void publishWithProducer2(String value) throws Exception { producer2.onData(value); }}Copy the code
- There are a few things to note about this code:
- Disruptor class init (Disruptor) : Disruptor class init (Disruptor) : Disruptor class init (Disruptor) Producertype. MULTI indicates that the production type is multi-producer and BlockingWaitStrategy is the wait strategy. If we did not pass this parameter in the previous code, the default is BlockingWaitStrategy
- Init also executes the setProducer2 method, which sets the member variable producer2
- Override the publishWithProducer2 method to call the producer2 member variable to publish the event
- Override the disruptorOperate method with two independent consumers
- Verify the above code is still of unit tests, open ConsumeModeServiceTest. Java, add the following code, visible added two threads at the same time perform the operation of the publish event:
@Autowired
@Qualifier("multiProducerService")
ConsumeModeService multiProducerService;
@Test
public void testMultiProducerService(a) throws InterruptedException {
log.info("start testMultiProducerService");
CountDownLatch countDownLatch = new CountDownLatch(1);
// Two producers, each producing 100 events, produce 200 events in total
// Two independent consumers, each consuming 200 events, thus consuming 400 events
int expectEventCount = EVENT_COUNT*4;
// Tell the service to execute the countdownlatch. countDown method when 400 messages have been consumed
multiProducerService.setCountDown(countDownLatch, expectEventCount);
// Start a thread that produces events with the first producer
new Thread(() -> {
for(int i=0; i<EVENT_COUNT; i++) { log.info("publich {}", i);
multiProducerService.publish(String.valueOf(i));
}
}).start();
// Start another thread and produce the event with a second producer
new Thread(() -> {
for(int i=0; i<EVENT_COUNT; i++) { log.info("publishWithProducer2 {}", i);
try {
multiProducerService.publishWithProducer2(String.valueOf(i));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// The current thread is waiting. The previous service.setCountDown method told the service,
The countdownlatch. countDown method is executed when the expectEventCount message is consumed
// Be sure to call await instead of wait!
countDownLatch.await();
// The total number of events consumed should equal the number of events published
assertEquals(expectEventCount, multiProducerService.eventCount());
}
Copy the code
- The test results are as follows:
C1 and C2 are consumed independently, while C3 depends on C1 and C2
- The logical diagram is as follows:
- Implementation code is as follows, very simple, dependencies can be implemented with then:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;
@Service("scene5")
public class Scene5 extends ConsumeModeService {
@Override
protected void disruptorOperate(a) {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
disruptor
// C1 and C2 are consumed independently
.handleEventsWith(c1, c2)
// C3 depends on C1 and C2.then(c3); }}Copy the code
- Unit test code:
@Autowired
@Qualifier("scene5")
Scene5 scene5;
@Test
public void testScene5 (a) throws InterruptedException {
log.info("start testScene5");
testConsumeModeService(scene5,
EVENT_COUNT,
// Three independent consumers, total consumption 300 events
EVENT_COUNT * 3);
}
Copy the code
- In order to save space, the test results will not be posted. It should be noted that each event must be consumed by C1 and C2 before it is consumed by C3.
C1 is consumed independently, C2 and C3 are consumed independently, but depend on C1, and C4 depends on C2 and C3
- The logical diagram is as follows:
- The implementation code is as follows:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;
@Service("scene6")
public class Scene6 extends ConsumeModeService {
@Override
protected void disruptorOperate(a) {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
disruptor
// C1
.handleEventsWith(c1)
C2 and C3 are also consumed independently
.then(c2, c3)
// C4 depends on C2 and C3.then(c4); }}Copy the code
- Unit test code:
@Autowired
@Qualifier("scene6")
Scene6 scene6;
@Test
public void testScene6 (a) throws InterruptedException {
log.info("start testScene6");
testConsumeModeService(scene6,
EVENT_COUNT,
// Four independent consumers, total consumption of 400 events
EVENT_COUNT * 4);
}
Copy the code
C1 and C2 are independent consumers, C3 and C4 are independent consumers, but both C3 and C4 depend on C1 and C2, and then C5 depends on C3 and C4
- The logical diagram is as follows:
- The implementation code is as follows:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;
@Service("scene7")
public class Scene7 extends ConsumeModeService {
@Override
protected void disruptorOperate(a) {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1 and C2 are consumed independently
.handleEventsWith(c1, c2)
// C3 and C4 are also independent consumption, but both C3 and C4 depend on C1 and C2
.then(c3, c4)
C5 then depends on C3 and C4.then(c5); }}Copy the code
- Unit test code:
@Autowired
@Qualifier("scene7")
Scene7 scene7;
@Test
public void testScene7 (a) throws InterruptedException {
log.info("start testScene7");
testConsumeModeService(scene7,
EVENT_COUNT,
// Five independent consumers, total consumption of 500 events
EVENT_COUNT * 5);
}
Copy the code
C1 and C2Collaborative consumptionSo is C3 and C4Collaborative consumptionBut C3 and C4 both depend on C1 and C2, and then C5 depends on C3 and C4
- The logical diagram is as follows:
- The implementation code is as follows:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
/ * * *@author will ([email protected])
* @version 1.0
* @description: C1 and C2 co-consume, C3 and C4 co-consume, but C3 and C4 both depend on C1 and C2, and C5 depends on C3 and C4 *@date2021/5/23 his * /
@Service("scene8")
public class Scene8 extends ConsumeModeService {
@Override
protected void disruptorOperate(a) {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter);
disruptor
// C1 and C2 consume together
.handleEventsWithWorkerPool(c1, c2)
// C3 and C4 are also independent consumption, but both C3 and C4 depend on C1 and C2
.thenHandleEventsWithWorkerPool(c3, c4)
C5 then depends on C3 and C4.thenHandleEventsWithWorkerPool(c5); }}Copy the code
- Unit test code:
@Autowired
@Qualifier("scene8")
Scene8 scene8;
@Test
public void testScene8 (a) throws InterruptedException {
log.info("start testScene8");
testConsumeModeService(scene8,
EVENT_COUNT,
// C1 and C2 share consumption, C3 and C4 share consumption, C5 is only one, but also common consumption pattern.
// There are three groups of consumers, so 300 events are consumed
EVENT_COUNT * 3);
}
Copy the code
C1 and C2Collaborative consumption, C3 and C4 are consumed independently, but both C3 and C4 depend on C1 and C2, and then C5 depends on C3 and C4
- The logical diagram is as follows:
- The implementation code is as follows:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("scene9")
public class Scene9 extends ConsumeModeService {
@Override
protected void disruptorOperate(a) {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1 and C2 consume together
.handleEventsWithWorkerPool(c1, c2)
// C3 and C4 are consumed independently, but both C3 and C4 depend on C1 and C2
.then(c3, c4)
C5 then depends on C3 and C4.then(c5); }}Copy the code
- Unit test code:
@Autowired
@Qualifier("scene9")
Scene9 scene9;
@Test
public void testScene9 (a) throws InterruptedException {
log.info("start testScene9");
testConsumeModeService(scene9,
EVENT_COUNT,
// C1 and C2 consume together (100 events),
// C3 and C4 independent consumption (200 events),
// C5 independent consumption (100 events),
// So 400 events are consumed
EVENT_COUNT * 4);
}
Copy the code
C1 and C2 are consumed independently, C3 and C4 areCollaborative consumptionBut C3 and C4 both depend on C1 and C2, and then C5 depends on C3 and C4
- The logical diagram is as follows:
- The implementation code is as follows:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("scene10")
public class Scene10 extends ConsumeModeService {
@Override
protected void disruptorOperate(a) {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1 and C2 consume together
.handleEventsWith(c1, c2)
// C3 and C4 are co-consumed, but both C3 and C4 depend on C1 and C2
.thenHandleEventsWithWorkerPool(c3, c4)
C5 then depends on C3 and C4.then(c5); }}Copy the code
- Unit test code:
@Test
public void testScene10 (a) throws InterruptedException {
log.info("start testScene10");
testConsumeModeService(scene10,
EVENT_COUNT,
// C1 and C2 independent consumption (200 events)
// C3 and C4 co-consume (100 events),
// C5 independent consumption (100 events),
// So 400 events are consumed
EVENT_COUNT * 4);
}
Copy the code
- At this point, some common scenarios of the code has been completed, I hope this article can give you some references, to help you use this excellent tool more easily;
You are not alone, Xinchen original accompany all the way
- Java series
- Spring series
- The Docker series
- Kubernetes series
- Database + middleware series
- The conversation series
Welcome to pay attention to the public number: programmer Xin Chen
Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…
Github.com/zq2599/blog…