1. Introduction
The secrets you should Know about Messaging middleware explains how to send and consume messages using RabbitMQ in detail. How to ensure the reliability of messages sent by producers; How to ensure the reliability of consumption messages, how to scale consumers horizontally, and how to peak the flow of consumers.
2. The original intention
The original purpose of this article is to understand why @Component + @RabbitListener annotations can be used to consume messages, and how to customize the configuration for consumers. With these questions in mind, start this source code analysis.
3. Source code analysis
3.1 Finding the Automatic Configuration Class
It is well known that all middleware that is integrated with SpringBoot is introduced into the project as a starter, in which case SpringBoot has an associated auto-configuration class that does some default configuration for us to use out of the box. It is also easy to find the relevant auto-configuration class method by typing the name associated with it, such as RabbitAuto to search for an auto-configuration class such as RabbitAutoConfiguration
3.2 Automatic configuration Class analysis
Open the RabbitAutoConfiguration class and you will see the following:
@Configuration
The annotation indicates that this configuration class will be scanned and parsed by the framework and injected into the IOC container@ConditionalOnClass
The annotation indicates that only the classpath path existsRabbitTemplate.class
andChannel.class
These two classes will scan and parse the current auto-configured class@EnableConfigurationProperties
Annotation instantiationRabbitProperties
And inject the relevant properties from the YML or properties file into the object@Import
Annotation indicates to introduceRabbitAnnotationDrivenConfiguration
The configuration class
According to the name of RabbitAnnotationDrivenConfiguration can guess the configuration class is probably we need to analysis the entrance to the class
3.3 Entry class analysis
3.3.1 Initializing message-related attributes
3.3.2 Declare the listener container factory configuration and the listener container factory
Enter the methods of the configuration class to configure the container factory
Here we can get a important information, that can be configured in the configuration file for SimpleRabbitListenerContainerFactory related configuration operation
spring:
rabbitmq:
listener:
simple:
concurrency: 4
max-concurrency: 10
batch-size: 100
prefetch: 100
acknowledge-mode: MANUAL
Copy the code
Several important classes have emerged so far:
SimpleRabbitListenerContainerFactoryConfigurer
SimpleRabbitListenerContainerFactory
RabbitProperties
The relationship between them is such SimpleRabbitListenerContainerFactoryConfigurer container factory configuration classes hold RabbitProperties properties configuration object, And related configuration, the SimpleRabbitListenerContainerFactory RabbitProperties attributes and can be in the application. The yml configuration file
3.3.3 Enabling the RabbitMQ Function
@ EnableRabbit annotation introduces two important classes RabbitListenerAnnotationBeanPostProcessor and RabbitListenerEndpointRegistry. RabbitListenerAnnotationBeanPostProcessor used to deal with the rear of the Bean, here you can imagine, after the post processor will Bean instantiation to special agent containing @ RabbitListener annotate the class, so as to realize the message consumption
3.4 Post-processing
3.4.1 Post-processing method
In the rear RabbitListenerAnnotationBeanPostProcessor processor rear processing method
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Class<? > targetClass = AopUtils.getTargetClass(bean);final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for(RabbitListener rabbitListener : lm.annotations) { processAmqpListener(rabbitListener, lm.method, bean, beanName); }}return bean;
}
Copy the code
The bean with the @rabbitListener annotation will be processed
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object target, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
/ /... Omit some coderesolveExecutor(endpoint, rabbitListener, target, beanName); resolveAdmin(endpoint, rabbitListener, target); resolveAckMode(endpoint, rabbitListener); resolvePostProcessor(endpoint, rabbitListener, target, beanName); RabbitListenerContainerFactory<? > factory = resolveContainerFactory(rabbitListener, target, beanName);this.registrar.registerEndpoint(endpoint, factory);
}
Copy the code
Each contains @ RabbitListener annotation methods corresponding to a MethodRabbitListenerEndpoint object, This object will store the @RabbitListener Annotation methods related attributes, and @ RabbitListener annotations to specify the properties of the final and SimpleRabbitListenerContainerFactory AmqpListenerEndpointDescriptor objects into endpointDesc Riptors collection
3.4.3 Initialization method after Bean creation
Because RabbitListenerAnnotationBeanPostProcessor SmartInitializingSingleton interface is achieved, so will the callback afterSingletonsInstantiated () method, In the callback method traverses 3.4.2 endpointDescriptors collection SimpleMessageListenerContainer registration, Then use SimpleRabbitListenerContainerFactory and MethodRabbitListenerEndpoint attribute configuration, Eventually put SimpleMessageListenerContainer created in RabbitListenerEndpointRegistry listenerContainers container
3.5 Application Life Cycle
RabbitListenerEndpointRegistry SmartLifecycle interface is achieved, in the application will start after the callback start () method
3.6 summary
RabbitListenerAnnotationBeanPostProcessor
The post-processing method handles the containing@RabbitListener
Annotation method, createMethodRabbitListenerEndpoint
Object and theSimpleRabbitListenerContainerFactory
Put a pair into the setRabbitListenerAnnotationBeanPostProcessor
The initialization method iterates over the collection basis of the previous stepSimpleRabbitListenerContainerFactory
createSimpleMessageListenerContainer
Object, and then useSimpleRabbitListenerContainerFactory
andMethodRabbitListenerEndpoint
rightSimpleMessageListenerContainer
The configuration will eventually beSimpleMessageListenerContainer
Add them to the Map containerRabbitListenerEndpointRegistry
The startup method iterates through the previous map and startsSimpleMessageListenerContainer
4. Start the listening container
4.1 start
protected void doStart(a) {
synchronized (this.consumersMonitor) {
// 1. Initialize consumers based on the number of concurrent requests
int newConsumers = initializeConsumers();
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
/ / 2. Every consumer to create a AsyncMessageProcessingConsumer object
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 3. Use a thread to execute
getTaskExecutor().execute(processor);
if(getApplicationEventPublisher() ! =null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } waitForConsumersToStart(processors); }}Copy the code
4.2 Performing Tasks
@Override // NOSONAR - complexity - many catch blocks
public void run(a) { // NOSONAR - line count
try {
// 1. Start the consumer
initialize();
// 2. Consumption message
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { mainLoop(); }}}Copy the code
4.2 Start consumers
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run—>org.spring framework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#initialize—>org.springfra mework.amqp.rabbit.listener.BlockingQueueConsumer#start—>org.springframework.amqp.rabbit.listener.BlockingQueueConsume r#setQosAndreateConsumers—>org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue
private void consumeFromQueue(String queue) throws IOException {
// 1. Set the message callback processing
InternalConsumer consumer = new InternalConsumer(this.channel, queue);
// 2. Tell the message server the name of the queue currently consumed by the channel, the ack mode of the message, and the callback to be executed when the message server pushes the message
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
(this.tagStrategy ! =null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
this.exclusive, this.consumerArgs,
consumer);
if(consumerTag ! =null) {
this.consumers.put(queue, consumer);
if (logger.isDebugEnabled()) {
logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ":" + this); }}else {
logger.error("Null consumer tag received for queue "+ queue); }}Copy the code
4.3 Message Callback Processing
InternalConsumer is responsible for the message callback reason
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
// 1. When the message server sends the message to the consumer, the callback puts the message into the queue
if(! BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queueName),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
}
}
else {
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queueName)); }}}Copy the code
4.4 Consumption news
Back to the code in Part 4.2, go to mainLoop() to see the logic for consuming the message
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
Channel channel = consumer.getChannel();
List<Message> messages = null;
long deliveryTag = 0;
for (int i = 0; i < this.batchSize; i++) {
logger.trace("Waiting for message from consumer.");
// 1. Fetch messages from the queue
Message message = consumer.nextMessage(this.receiveTimeout);
if (this.consumerBatchEnabled) {
}
else {
try {
// 2. Call listener methods to execute business logicexecuteListener(channel, message); }}}if (this.consumerBatchEnabled && messages ! =null) {
executeWithList(channel, messages, deliveryTag, consumer);
}
// 3. Send an ACK to the message server
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
Copy the code
4.5 summary
-
Start SimpleMessageListenerContainer, according to the number of concurrent create consumers
-
Tells the message server the queue to consume, the ACK mode, and specifies the callback to process the message
-
The message server pushes the message to the consumer, performs the callback, and the callback places the message in the queue
-
The consumer thread takes messages from the queue in an infinite loop, consuming them to perform business logic
-
Send an ACK to the message server after executing the business logic