1. Introduction

The secrets you should Know about Messaging middleware explains how to send and consume messages using RabbitMQ in detail. How to ensure the reliability of messages sent by producers; How to ensure the reliability of consumption messages, how to scale consumers horizontally, and how to peak the flow of consumers.

2. The original intention

The original purpose of this article is to understand why @Component + @RabbitListener annotations can be used to consume messages, and how to customize the configuration for consumers. With these questions in mind, start this source code analysis.

3. Source code analysis

3.1 Finding the Automatic Configuration Class

It is well known that all middleware that is integrated with SpringBoot is introduced into the project as a starter, in which case SpringBoot has an associated auto-configuration class that does some default configuration for us to use out of the box. It is also easy to find the relevant auto-configuration class method by typing the name associated with it, such as RabbitAuto to search for an auto-configuration class such as RabbitAutoConfiguration

3.2 Automatic configuration Class analysis

Open the RabbitAutoConfiguration class and you will see the following:

  • @ConfigurationThe annotation indicates that this configuration class will be scanned and parsed by the framework and injected into the IOC container
  • @ConditionalOnClassThe annotation indicates that only the classpath path existsRabbitTemplate.classandChannel.classThese two classes will scan and parse the current auto-configured class
  • @EnableConfigurationPropertiesAnnotation instantiationRabbitPropertiesAnd inject the relevant properties from the YML or properties file into the object
  • @ImportAnnotation indicates to introduceRabbitAnnotationDrivenConfigurationThe configuration class

According to the name of RabbitAnnotationDrivenConfiguration can guess the configuration class is probably we need to analysis the entrance to the class

3.3 Entry class analysis

3.3.1 Initializing message-related attributes

3.3.2 Declare the listener container factory configuration and the listener container factory

Enter the methods of the configuration class to configure the container factory

Here we can get a important information, that can be configured in the configuration file for SimpleRabbitListenerContainerFactory related configuration operation

spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 4
        max-concurrency: 10
        batch-size: 100
        prefetch: 100
        acknowledge-mode: MANUAL
Copy the code

Several important classes have emerged so far:

  • SimpleRabbitListenerContainerFactoryConfigurer
  • SimpleRabbitListenerContainerFactory
  • RabbitProperties

The relationship between them is such SimpleRabbitListenerContainerFactoryConfigurer container factory configuration classes hold RabbitProperties properties configuration object, And related configuration, the SimpleRabbitListenerContainerFactory RabbitProperties attributes and can be in the application. The yml configuration file

3.3.3 Enabling the RabbitMQ Function

@ EnableRabbit annotation introduces two important classes RabbitListenerAnnotationBeanPostProcessor and RabbitListenerEndpointRegistry. RabbitListenerAnnotationBeanPostProcessor used to deal with the rear of the Bean, here you can imagine, after the post processor will Bean instantiation to special agent containing @ RabbitListener annotate the class, so as to realize the message consumption

3.4 Post-processing

3.4.1 Post-processing method

In the rear RabbitListenerAnnotationBeanPostProcessor processor rear processing method

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Class<? > targetClass = AopUtils.getTargetClass(bean);final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
    for (ListenerMethod lm : metadata.listenerMethods) {
        for(RabbitListener rabbitListener : lm.annotations) { processAmqpListener(rabbitListener, lm.method, bean, beanName); }}return bean;
}
Copy the code

The bean with the @rabbitListener annotation will be processed

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object target, String beanName) {

		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(rabbitListener));
		endpoint.setQueueNames(resolveQueues(rabbitListener));
		endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
		endpoint.setBeanFactory(this.beanFactory);
		endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
		/ /... Omit some coderesolveExecutor(endpoint, rabbitListener, target, beanName); resolveAdmin(endpoint, rabbitListener, target); resolveAckMode(endpoint, rabbitListener); resolvePostProcessor(endpoint, rabbitListener, target, beanName); RabbitListenerContainerFactory<? > factory = resolveContainerFactory(rabbitListener, target, beanName);this.registrar.registerEndpoint(endpoint, factory);
	}
Copy the code

Each contains @ RabbitListener annotation methods corresponding to a MethodRabbitListenerEndpoint object, This object will store the @RabbitListener Annotation methods related attributes, and @ RabbitListener annotations to specify the properties of the final and SimpleRabbitListenerContainerFactory AmqpListenerEndpointDescriptor objects into endpointDesc Riptors collection

3.4.3 Initialization method after Bean creation

Because RabbitListenerAnnotationBeanPostProcessor SmartInitializingSingleton interface is achieved, so will the callback afterSingletonsInstantiated () method, In the callback method traverses 3.4.2 endpointDescriptors collection SimpleMessageListenerContainer registration, Then use SimpleRabbitListenerContainerFactory and MethodRabbitListenerEndpoint attribute configuration, Eventually put SimpleMessageListenerContainer created in RabbitListenerEndpointRegistry listenerContainers container

3.5 Application Life Cycle

RabbitListenerEndpointRegistry SmartLifecycle interface is achieved, in the application will start after the callback start () method

3.6 summary

  • RabbitListenerAnnotationBeanPostProcessorThe post-processing method handles the containing@RabbitListenerAnnotation method, createMethodRabbitListenerEndpointObject and theSimpleRabbitListenerContainerFactoryPut a pair into the set
  • RabbitListenerAnnotationBeanPostProcessorThe initialization method iterates over the collection basis of the previous stepSimpleRabbitListenerContainerFactorycreateSimpleMessageListenerContainerObject, and then useSimpleRabbitListenerContainerFactoryandMethodRabbitListenerEndpointrightSimpleMessageListenerContainerThe configuration will eventually beSimpleMessageListenerContainerAdd them to the Map container
  • RabbitListenerEndpointRegistryThe startup method iterates through the previous map and startsSimpleMessageListenerContainer

4. Start the listening container

4.1 start

protected void doStart(a) {
    synchronized (this.consumersMonitor) {
        // 1. Initialize consumers based on the number of concurrent requests
        int newConsumers = initializeConsumers();
        Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
        for (BlockingQueueConsumer consumer : this.consumers) {
            / / 2. Every consumer to create a AsyncMessageProcessingConsumer object
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            // 3. Use a thread to execute
            getTaskExecutor().execute(processor);
            if(getApplicationEventPublisher() ! =null) {
                getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } waitForConsumersToStart(processors); }}Copy the code

4.2 Performing Tasks

@Override // NOSONAR - complexity - many catch blocks
public void run(a) { // NOSONAR - line count
    try {
        // 1. Start the consumer
        initialize();
        // 2. Consumption message
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { mainLoop(); }}}Copy the code

4.2 Start consumers

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run—>org.spring framework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#initialize—>org.springfra mework.amqp.rabbit.listener.BlockingQueueConsumer#start—>org.springframework.amqp.rabbit.listener.BlockingQueueConsume r#setQosAndreateConsumers—>org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue

private void consumeFromQueue(String queue) throws IOException {
    // 1. Set the message callback processing
    InternalConsumer consumer = new InternalConsumer(this.channel, queue);
    // 2. Tell the message server the name of the queue currently consumed by the channel, the ack mode of the message, and the callback to be executed when the message server pushes the message
    String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
                                                   (this.tagStrategy ! =null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
                                                   this.exclusive, this.consumerArgs,
                                                   consumer);

    if(consumerTag ! =null) {
        this.consumers.put(queue, consumer);
        if (logger.isDebugEnabled()) {
            logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ":" + this); }}else {
        logger.error("Null consumer tag received for queue "+ queue); }}Copy the code

4.3 Message Callback Processing

InternalConsumer is responsible for the message callback reason

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                           byte[] body) {
    try {
        if (BlockingQueueConsumer.this.abortStarted > 0) {
            // 1. When the message server sends the message to the consumer, the callback puts the message into the queue
            if(! BlockingQueueConsumer.this.queue.offer(
                new Delivery(consumerTag, envelope, properties, body, this.queueName),
                BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
            }
        }
        else {
            BlockingQueueConsumer.this.queue
                .put(new Delivery(consumerTag, envelope, properties, body, this.queueName)); }}}Copy the code

4.4 Consumption news

Back to the code in Part 4.2, go to mainLoop() to see the logic for consuming the message

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

    Channel channel = consumer.getChannel();

    List<Message> messages = null;
    long deliveryTag = 0;

    for (int i = 0; i < this.batchSize; i++) {

        logger.trace("Waiting for message from consumer.");
        // 1. Fetch messages from the queue
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (this.consumerBatchEnabled) {
        }
        else {
            try {
                // 2. Call listener methods to execute business logicexecuteListener(channel, message); }}}if (this.consumerBatchEnabled && messages ! =null) {
        executeWithList(channel, messages, deliveryTag, consumer);
    }
	// 3. Send an ACK to the message server
    return consumer.commitIfNecessary(isChannelLocallyTransacted());

}
Copy the code

4.5 summary

  • Start SimpleMessageListenerContainer, according to the number of concurrent create consumers

  • Tells the message server the queue to consume, the ACK mode, and specifies the callback to process the message

  • The message server pushes the message to the consumer, performs the callback, and the callback places the message in the queue

  • The consumer thread takes messages from the queue in an infinite loop, consuming them to perform business logic

  • Send an ACK to the message server after executing the business logic