General documentation: Article directory Github: github.com/black-ant

preface

This article will attempt to explain the slightly deeper technical aspects of RabbitMQ in the hope of clarifying them.

See the MQ Overview for an introduction to RabbitMQ, but without much description, we’ll focus on four of them in turn.

1. Basic use

1.1 Configuration Details

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

application.yml

spring:
  rabbitmq:
    host: 127.0. 01.
    port: 5672
    username: guest
    password: guest
Copy the code

1.2 consumers

@RabbitListener(bindings = @QueueBinding(value = @Queue("DirectA"),key = "ONE",exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)))
public void processA(String message) {
    logger.info("------> DirectA received successfully :{}<-------", message);
}
Copy the code

1.3 producers

rabbitTemplate.convertAndSend("DirectExchange"."ONE"."Send a message :" + msg);
Copy the code

As you can see from the image, DirectExchange actually binds three queues, so I have multiple consumers receiving messages

1.4 Four sending modes

Fanout => Messages sent to the switch are sent to all bound queues

    @RabbitListener(bindings = @QueueBinding( value = @Queue("FanoutA"), key = "ONE", exchange = @Exchange(name = "FanoutExchange", type = ExchangeTypes.FANOUT) ))
    public void processA(String message) {
        logger.info("------> FanoutA sending and receiving success :{}<-------", message);
    }

Copy the code

Direct => The message is routed to a queue with the same BindingKey and RoutingKey

    @RabbitListener(bindings = @QueueBinding( value = @Queue("DirectA"), key = "ONE", exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT) ))
    public void processA(String message) {
        logger.info("------> DirectA received successfully :{}<-------", message);
    }

Copy the code

Top => BindingKey uses */# blur matching

    @RabbitListener(bindings = @QueueBinding( value = @Queue("TopicA"), key = "ONE.*", exchange = @Exchange(name = "TopicExchange", type = ExchangeTypes.TOPIC) ))
    public void processA(String message) {
        logger.info("------> TopicA send and receive successfully :{}<-------", message);
    }

Copy the code

Header => Matches according to the header property of the message content type

1.5 Low-level Usage

1.5.1 Creating a Connection to send messages

// The following is a complete process that is not sent by Spring. In SpringBoot, these processes are brokered
ConnectionFactory connectionFactory = new CachingConnectionFactory();

AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));

AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue"."foo");

String foo = (String) template.receiveAndConvert("myqueue");
Copy the code

1.5.2 Build a Binding


// Direct
new Binding(someQueue, someDirectExchange, "foo.bar");

// Topic
new Binding(someQueue, someTopicExchange, "foo.*");

// Fanout
new Binding(someQueue, someFanoutExchange);

// 

Copy the code

1.5.3 Connection factory

RabbitTemplate provides three connection factories:

  • PooledChannelConnectionFactory
    • Common, pool-based connection factory (Commons-pool2)
    • Support for simple Publisher validation
  • ThreadChannelConnectionFactory
    • You need to use scope operations to ensure strict message order
    • Support for simple Publisher validation
    • This factory ensures that all operations on the same thread use the same channel
  • CachingConnectionFactory
    • You can open multiple connections with CacheMode (shared, as opposed to pooled)

    • Can be confirmed by the relevant publisher

    • Support for simple Publisher validation

PS: The first two require the Spring-Rabbit package

The three connection pools offer different capabilities that can be selected and customized for your project

PooledChannelConnectionFactory way of building

@Bean
PooledChannelConnectionFactory pcf(a) throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool}});return pcf;
}

// Let's look at the source code:

C- ThreadChannelConnectionFactory
    C- ConnectionWrapper
        private final ThreadLocal<Channel> channels = new ThreadLocal<>();
        private final ThreadLocal<Channel> txChannels = newThreadLocal<>(); ? - Internal main through2One ThreadLocal to store a Channel2One was prepared for the transaction channel of C - C - ConnectionWrapper PooledChannelConnectionFactoryprivate final ObjectPool<Channel> channels;
        private finalObjectPool<Channel> txChannels; ? - ObjectPool is a component of CommonPool// 

Copy the code

CachingConnectionFactory creates a new connection

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();


// PS: note the constructor, which provides more than one way to construct
public CachingConnectionFactory(@Nullable String hostname)
public CachingConnectionFactory(int port)
public CachingConnectionFactory(@Nullable String hostNameArg, int port)
public CachingConnectionFactory(URI uri)
public CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory)
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
			boolean isPublisherFactory)
// and provides a static constructor
private static com.rabbitmq.client.ConnectionFactory newRabbitConnectionFactory(a)


// Add custom client connection properties? - access the underlying connectionFactory and set the custom client property connectionFactory.getRabbitConnectionFactory(a).getClientProperties(a).put("thing1"."thing2");


// Go further:
1Internal class CacheMode: CacheMode, including CHANNEL and CONNECTION// Attribute related to CHANNEL- connectionName: Specifies the name of the connection generated by ConnectionNameStrategy. - channelCacheSize: Specifies the maximum number of idle channels currently configured. Local port to connect to -IDlechannelstx: Number of currently idle transaction channels (cache) -IDlechannelsnottx: Number of currently idle non-transactional channels (cache) -IDlechannelstxhighwater: Has: the maximum number of concurrent free transaction channel (cache) - idleChannelsNotTxHighWater: non-transactional channel has the maximum number of concurrent free (cache)// CONNECTION related attributes-connectionName :<localPort> : Name of the connection generated by ConnectionNameStrategy -OpenConnections: Indicates the number of connection objects for the connection to the broker -channelCachesize: specifies the maximum number of currently configured channels allowed to be idle -connectionCachesize: specifies the maximum number of currently configured connections allowed to be idle -idleconnections: Number of currently idle connections -idleconnectionShighwater: Maximum number of concurrently idle connections -idlechannelstx :<localPort> : IdleChannelsNotTx :<localPort> : Number of currently idle non-transactional channels for this connection (cache) -IDlechannelstxhighwater :<localPort> : Has: the maximum number of concurrent free transaction channel (cache) - idleChannelsNotTxHighWater: < localPort > : non-transactional channel has the maximum number of concurrent free (cache)Copy the code

1.5.4 Sending a Message

void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// Similarly, in business we can use a more reasonable way to send
RabbitTemplate template = new RabbitTemplate(); 
template.setRoutingKey("queue.helloWorld"); 

// Template can also be customized, for exampletemplate.setConfirmCallback(...) ;// This can be customized
MessageProperties properties = new MessageProperties();
/ /... omit
template.send(new Message("Hello World".getBytes(), properties));




// In the same way, messages can be processed in detail
Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar"."baz")
    .build();
public static MessageBuilder withBody(byte[] body)? - The message created by the generator has a body, which is a direct reference to the argument -return newMessageBuilder(body);
public static MessageBuilder withClonedBody(byte[] body)? - Arrays creates a new array. -return newMessageBuilder(Arrays.copyOf(body, body.length));
public static MessageBuilder withBody(byte[] body, int from, int to) 
    - return new MessageBuilder(Arrays.copyOfRange(body, from, to));
public static MessageBuilder fromMessage(Message message) 
    - return new MessageBuilder(message);
public static MessageBuilder fromClonedMessage(Message message) 
    - byte[] body = message.getBody();
    - return new MessageBuilder(Arrays.copyOf(body, body.length), message.getMessageProperties());
    
    

// MessageProperties Extension mode
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar"."baz")
    .build();
    
public static MessagePropertiesBuilder newInstance(a)? - Initialize the new message properties object with the default valuespublic static MessagePropertiesBuilder fromProperties(MessageProperties properties)? - return newMessagePropertiesBuilder(properties): is created with the properties passed inpublic static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)? - Internally carried out the Builder.copyProperties(properties);

Copy the code

1.5.5 Receiving messages

Message receive(a) throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;

Object receiveAndConvert(a) throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

Copy the code

1.5.6 Enabling annotations

@EnableRabbit
Copy the code

2. Source analysis

2.1 Basic Interfaces


// The Exchange interface represents the AMQP Exchange, which is what the message generator sends to.
// Each Exchange in the proxy virtual host has a unique name and some other properties
public interface Exchange {
    String getName(a);
    // Direct, topic, fanout and header
    String getExchangeType(a);
    boolean isDurable(a);
    boolean isAutoDelete(a);
    Map<String, Object> getArguments(a);
}

// The Queue class represents the component from which message consumers receive messages.
// As with the various Exchange classes, our implementation is intended to be an abstract representation of this core AMQP type
public class Queue  {
    private final String name;
    private volatile boolean durable;
    private volatile boolean exclusive;
    private volatile boolean autoDelete;
    private volatile Map<String, Object> arguments;
    public Queue(String name) {
        this(name, true.false.false); }}Copy the code

2.2 Brief description of main categories

I’ve talked about the low-level use of non-Spring loading, mainly ConnectionFactory,AmqpAdmin,Queue, and the column just talks about the pre-processing related classes

2.2.1 Connect factory series

Let’s look at these three classes first:

ConnectionFactory :

The ConnectionFactory here is primarily a CachingConnectionFactory. This class acts like a cache connection pool. The CHANNEL(default) returns the same connection from all createConnection() calls. Ignore the call to connection.close () and cache the CHANNEL

By default, only one channel is cached, and channels for further requests are created and processed as needed.

// The default cache size
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
// Channel timeout period
private static final int CHANNEL_EXEC_SHUTDOWN_TIMEOUT = 30;


/ / core classes
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
    Semaphore permits = null; // Found a magical utility class that can be interpreted as shared locks
    if (this.channelCheckoutTimeout > 0) {
        permits = obtainPermits(connection);
    }
    LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
    ChannelProxy channel = null;
    if (connection.isOpen()) {
        channel = findOpenChannel(channelList, channel);
    }
    if (channel == null) {
        try {
        
            channel = getCachedChannelProxy(connection, channelList, transactional);
        }catch (RuntimeException e) {
            // Release Semaphore, which is a multithreaded Semaphore tool
        }
       return channel;
}


private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList,ChannelProxy channelArg) {
    ChannelProxy channel = channelArg;
    synchronized (channelList) {
        while(! channelList.isEmpty()) {// Get the first channel in channelList and remove it from the collection
            channel = channelList.removeFirst();
                if (channel.isOpen()) {
                    break;
                } else {
                    cleanUpClosedChannel(channel);
                    channel = null; }}}return channel;
}

// CachingConnectionFactory 

Copy the code

2.2.2 Family of framework members

2.2.3 Posting Member Series (Admin)

AmqpAdmin

AmqpAdmin is an interface whose core implementation class is RabbitAdmin that enables portable AMQP management operations to automatically declare and bind queues whenever RabbitAdmin is present in the application context

// declareExchanges:
// Declare an Exchange whose configuration is mainly from annotations or configuration
Map<String, Object> arguments = exchange.getArguments();
channel.exchangeDeclare(
    exchange.getName(), 
    DELAYED_MESSAGE_EXCHANGE, 
    exchange.isDurable(),
    exchange.isAutoDelete(), 
    exchange.isInternal(), 
    arguments
);

//-----------------
// Method: declareQueues declare a Queue
// Method: declareBindings

Copy the code

As you can see later, this class is used to complete initialization when scanning annotations are deployed

RabbitAdmin

RabbitAdmin is the lower-level class and is called mainly in these places

At the heart of RabbitAdmin is the logic for multiple declarations,


// Node 1: create a Channel, and then use the Callback method to create an Exchange
M- declareExchange(final Exchange exchange)
    // Note that the excute argument is a ChannelCallback, which is intended for the callback after channle is created
    this.rabbitTemplate.execute(channel -> {
        declareExchanges(channel, exchange);
        return null; }); M- declareExchanges ? - This method allows multiple exchanges to be passed at once - Prepare arguments properties first - Call channel.exchangeDeclare to implement the declaration operation C-rabbitTemplate m-execute: RabbitTemplate uses retryTemplate to retry calls? - doExecute, Channel is still created primarily in RabbitTemplate? - An operation in RabbitAdmin before doExecute and Callback, similar to a Callback -return this.retryTemplate.execute(
            (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
            (RecoveryCallback<T>) this.recoveryCallback);
            
// Node 2: delete an Exchange
M- deleteExchange(finalString exchangeName) : delete an Exchange by name. - Determine whether the default Exchange is the Exchange. The name is""- Then delete directly via channel.exchangeDelete// Node 3: Create Queue, same as Exchange
M- declareQueue(finalQueue Queue) - Create Channel first and call back - Call declareQueues(Channel, Queue); -channel.queueDeclare: Declare a Queue using a channel object m-declareQueue ()? - Declare a server named exclusive, autodelete, non-persistent queue? Create a Chhenl and then create a Queue -this.rabbitTemplate.execute(Channel::queueDeclare)
        
M- deleteQueue(finalString queueName) ? - Create a Channel and then delete the m-deletequeue (final String queueName, final boolean unused, final booleanempty) ? - More detailed deletion methods, including whether to use, whether to empty to determine whether to delete the M-PurgeQueue? - Clear the queue, optionally not waiting for cleanup to occur// Node 4: Binding processingM-declarebinding - first establish channel-declarebindings (Channel, binding); - channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments()); - channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments()); M- removeBinding(final Binding binding)
    - channel.queueUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
    - channel.exchangeUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
                                                

M- QueueInformation getQueueInfo(String queueName)
    - DeclareOk declareOk = channel.queueDeclarePassive(queueName);
    - return new QueueInformation(declareOk.getQueue(), declareOk.getMessageCount(),declareOk.getConsumerCount());



Copy the code

2.2.4 event Event series

The main events in Rabbit are:

  • AsyncConsumerStartedEvent: consumer is started
  • Restart the consumer after AsyncConsumerRestartedEvent: failure
  • AsyncConsumerTerminatedEvent: consumers to stop
  • AsyncConsumerStoppedEvent: consumers to stop (SimpleMessageListenerContainer use)
  • ConsumeOkEvent: When consumeOk is received from the agent
  • ListenerContainerIdleEvent :
  • MissingQueueEvent: Lost Queue

2.2.5 Transaction Transaction processing

The difference between the two approaches is that by providing an external transaction (by setting channelTransacted + a TransactionManager), One is internal logic (by setting the channelTransacted + Transaction declaration (for example, @transaction)).

A: In RabbitTemplate and SimpleMessageListenerContainer, there is a logo channelTransacted, if this flag is true, then tell the framework using transaction channel, And ends all operations (send or receive) by committing or rolling back (depending on the result), with an exception signaling a rollback.

For example, BlockingQueueConsumer

// Step 1: Create a Consumer with isChannelTransacted
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
    this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
    isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);

// Step 2: Finally call in the relevant method
M- rollbackOnExceptionIfNecessary
    if (this.transactional) -> RabbitUtils.rollbackIfNecessary(this.channel);
    
    
// Transaction declaration:
1For example, you can use the Spring transaction model// Transaction principle:
TODO 

Copy the code

Method 2: provide an external affairs, which contains a Spring PlatformTransactionManager implementation, as the context of the ongoing operations. If a transaction was already in progress when the framework sent or received the message, and the channelTransacted flag is true, the commit or rollback of the messaging transaction is deferred until the end of the current transaction. If channelTransacted is marked false, transaction semantics are not applied to the messaging operation (it is logged automatically)

@Bean
public RabbitTransactionManager rabbitTransactionManager(a) {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(a) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(rabbitTransactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
}

// Let's go deeper:C- SimpleMessageListenerContainer M- doReceiveAndExecute ? - Where you can see a call to the transactionManager in a Catch...catch(Exception ex) {- Determine whether transactionManager exists - Determine whether Callback is enabled RabbitResourceHolder resourceHolder = (RabbitResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory());if(resourceHolder ! =null) {
                    consumer.clearDeliveryTags();
                } else {
                    consumer.rollbackOnExceptionIfNecessary(ex);
                }

Copy the code

2.3 Detailed analysis of RabbitListener annotations

In fact, we are not actively creating a Queue or Exchange, so what does the annotation do for us?

Let’s first look at what methods are provided in the annotations:

  • id :
  • ContainerFactory: Container factory class
  • Queues: Specifies queues
  • QueuesToDeclare: If there is a RabbitAdmin in the application context, the queue will be bound to the broker by default
  • exclusive :
  • priority :
  • Admin: AmqpAdmin class
  • -Vanessa: Bindings
  • group :
  • ReturnExceptions: Returns the exception to the sender wrapped in the RemoteInvocationResult object
  • ErrorHandler: exception handling Handler
  • Concurrency: Set the concurrency of the listener container for this listener
  • AutoStartup: Whether to start when ApplicationContext starts
  • Executor: Sets the name of the task executor bean for this listener container; Overrides any executants set up on the container factory
  • **ackMode :**AcknowledgeMode
  • replyPostProcessor :

Automatic persistence

 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"), key = "orderRoutingKey") )
Copy the code

An anonymous (exclusive, automatically deleted) queue is declared and bound

// There is nothing set to Queue
  @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "auto.exch"), key = "invoiceRoutingKey") )

Copy the code

Annotation Get Header

@RabbitListener(queues = "myQueue")
public void processOrder(Order order, @Header("order_type") String orderType)
Copy the code

Listening to multiple Queues

@RabbitListener(queues = { "queue1", "queue2" } )

// You can also use SPEL expressions
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )

Copy the code

SendTo sends an additional message

  • @sendto: Sends the received message to the specified routing destination. All users who subscribe to the message can receive it. It is a broadcast.
  • @ SendToUser: message destination have UserDestinationMessageHandler, message routing and arrival will be anything corresponding to the destination.
@SendTo("#{environment['my.send.to']}")


// How to use beans
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}

@Bean
public String spelReplyTo(a) {
    return "test.sendTo.reply.spel";
}

// SPEL
@SendTo("! {'some.reply.queue.with.' + result.queueName}")

// Spring Enviroment
@SendTo("#{environment['my.send.to']}")

Copy the code

Define the response type

@RabbitListener(queues = "q1", messageConverter = "delegating", replyContentType = "application/json")

Copy the code

Core class a: RabbitListenerAnnotationBeanPostProcessor

Which will be according to the parameters of the annotations, by creating closer message listener container calls RabbitListenerContainerFactory, automatically detect any RabbitListenerConfigurer instances of container at the same time, allow the custom registry, Default container factory or fine-grained control over endpoint registration

// RabbitListenerAnnotationBeanPostProcessor


// Step Start : postProcessAfterInitialization
/ / postProcessAfterInitialization from BeanPostProcessor, he will call before custom initialization
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    / / get ServletWebServerFactoryConfigurationClass<? > targetClass = AopUtils.getTargetClass(bean);// Get the RabbitListener Metadata for each class
    final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
    for (ListenerMethod lm : metadata.listenerMethods) {
        for (RabbitListener rabbitListener : lm.annotations) {
            // If there are annotations, processingprocessAmqpListener(rabbitListener, lm.method, bean, beanName); }}if (metadata.handlerMethods.length > 0) {
        processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
    }
    return bean;
}



// Step: findListenerAnnotations Scans all annotations
private RabbitListenerAnnotationBeanPostProcessor.TypeMetadata buildMetadata(Class
        targetClass) {
        // There is a check for @rabbitListener for each class
        Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
        final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
        final List<RabbitListenerAnnotationBeanPostProcessor.ListenerMethod> methods = new ArrayList<>();
        final List<Method> multiMethods = new ArrayList<>();
        ReflectionUtils.doWithMethods(targetClass, method -> {
            Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
            if (listenerAnnotations.size() > 0) {
                // If there are annotations, add them to the collection
                methods.add(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod(method,
                        listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
            }
            if (hasClassLevelListeners) {
                RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
                if(rabbitHandler ! =null) {
                    multiMethods.add(method);
                }
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        if (methods.isEmpty() && multiMethods.isEmpty()) {
            return RabbitListenerAnnotationBeanPostProcessor.TypeMetadata.EMPTY;
        }
        // Build a TypeMetadata machine
        return new RabbitListenerAnnotationBeanPostProcessor.TypeMetadata(
                methods.toArray(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod[methods.size()]),
                multiMethods.toArray(new Method[multiMethods.size()]),
                classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

Copy the code

Have seen above, will eventually postProcessAfterInitialization method invokes the processAmqpListener to handle comments, let’s take a look at processAmqpListener method

// processAmqpListener
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
    // If it is a proxy, return the method of the final proxy
    Method methodToUse = checkProxy(method, bean);
    / / build a MethodRabbitListenerEndpoint, the unit is used for processing the endpoint of the incoming message
    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    // Listener annotates the process
    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}


// Let's go further and omit the meaningless part of a long paragraph:
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object target, String beanName) {

    // Endpoint injection attribute
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    endpoint.setId(getEndpointId(rabbitListener));
    // This is just a checksum resolution for QueueName
    endpoint.setQueueNames(resolveQueues(rabbitListener));
    // Process the listener
    endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
    endpoint.setBeanFactory(this.beanFactory);
    // Exception handling
    endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
    
    Object errorHandler = resolveExpression(rabbitListener.errorHandler());
    if (errorHandler instanceof RabbitListenerErrorHandler) {
    	endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
    }
    else if (errorHandler instanceof String) {
    	String errorHandlerBeanName = (String) errorHandler;
    	if (StringUtils.hasText(errorHandlerBeanName)) {
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class)); }}else {
    	throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
        	+ errorHandler.getClass().toString());
    }
    String group = rabbitListener.group();
    if (StringUtils.hasText(group)) {
    	Object resolvedGroup = resolveExpression(group);
    	if (resolvedGroup instanceof String) {
			endpoint.setGroup((String) resolvedGroup);
    	}
    }
    String autoStartup = rabbitListener.autoStartup();
    if (StringUtils.hasText(autoStartup)) {
    	endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
    }

    endpoint.setExclusive(rabbitListener.exclusive());
    String priority = resolve(rabbitListener.priority());
    if (StringUtils.hasText(priority)) {
		endpoint.setPriority(Integer.valueOf(priority));
    }

    resolveExecutor(endpoint, rabbitListener, target, beanName);
    resolveAdmin(endpoint, rabbitListener, target);
    // Let's look at the ACK mode laterresolveAckMode(endpoint, rabbitListener); resolvePostProcessor(endpoint, rabbitListener, target, beanName); RabbitListenerContainerFactory<? > factory = resolveContainerFactory(rabbitListener, target, beanName);this.registrar.registerEndpoint(endpoint, factory);
}

/ / register the Endpoint
public void registerEndpoint(RabbitListenerEndpoint endpoint,
			@NullableRabbitListenerContainerFactory<? > factory) {
    AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
    synchronized (this.endpointDescriptors) {
        if (this.startImmediately) { 
            this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null
            resolveContainerFactory(descriptor), true);
        }else {
            this.endpointDescriptors.add(descriptor); }}}Copy the code

What does RabbitTemplate do


/ / Step 1: RabbitListenerEndpointRegistry Listener
public void start(a) {
    for(MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); }}// The source of getListenerContainers >>
RabbitListenerEndpointRegistry  -->  Map<String, MessageListenerContainer>
// Note that this matches the scan above and is called by registerEndpoint
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<? > factory,boolean startImmediately) {

    String id = endpoint.getId();
    synchronized (this.listenerContainers) {
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext ! =null) {
            List<MessageListenerContainer> containerGroup;
            if (this.applicationContext.containsBean(endpoint.getGroup())) {
                containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
            }else {
                containerGroup = new ArrayList<MessageListenerContainer>();
                this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
            }
            containerGroup.add(container);
        }
        if (this.contextRefreshed) {
            container.lazyLoad();
        }
        if(startImmediately) { startIfNecessary(container); }}}// During the scan, the MessageListenerContainer will be put into the Map for subsequent use
/ / Step 2: RabbitListenerEndpointRegistry Listener running
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
        / / mainly SimpleMessageListenerContainer
        / / inherited AbstractMessageListenerContainer, start in the classlistenerContainer.start(); }}// Step 3: Start

configureAdminIfNeeded();
// check the Queue, mainly by calling RabbitAdmin. If the check fails, the Channel will be shutdown
// Channel shutdown: channel error; protocol method: #method
      
       (reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'HeaderExchange' in vhost '/': received 'direct' but current is 'headers', class-id=40, method-id=10)
      
checkMismatchedQueues();
    - 1Determine the Exchange situation and output a log:! isDurable / isAutoDelete / isInfoEnabled -2Determine the Queue and output a log:! IsDurable/isAutoDelete/isExclusive/isInfoEnabled(Log enabled)// Note that this is where you were looking for the declaration
    - 3 declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
        - channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),exchange.isAutoDelete(), exchange.isInternal(), arguments);
    - 4 declareQueues(channel, queues.toArray(new Queue[queues.size()]));
        - channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
    - 5 declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
        - channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
// Finally, the doStart of the one-time class is called for subsequent extensions
doStart();
Copy the code

As you can see above, how the annotation registers the message, let’s look at the process of sending and receiving the message:

2.4 Sending messages

Usually we send messages using the following code:

rabbitTemplate.convertAndSend("DirectExchange"."ONE"."Send a message :" + msg);
Copy the code

Let’s see what’s going on in this one:


// There is a Boolean in the middle, which we use pseudocode to represent
Boolean item1 = RabbitTemplate.this.returnCallback ! =null|| (correlationData ! =null && StringUtils.hasText(correlationData.getId()))
Boolean item2 = RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class)
Boolean mandatory =  item1 && item2 ;
            

@Override
public void send(final String exchange, final String routingKey,
                    final Message message, @Nullable final CorrelationData correlationData)
                    throws AmqpException {
    execute(channel -> {
        // The core method to send the message
        doSend(channel, exchange, routingKey, message,mandatory,correlationData);
        return null;
    }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}

// In doSend we also remove irrelevant
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
			boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {

    // The Exchange parameter and the routingKey parameter are prepared
    String exch = exchangeArg;
    String rKey = routingKeyArg;

    // MessageProperties are stored in the messageToUse
   
    sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
    if(isChannelLocallyTransacted(channel)) { RabbitUtils.commitIfNecessary(channel); }}protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,Message message) throws IOException {

    BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
    // This is the normal RabbitMQ Client operation. This is the end of sending
    // com.rabbitmq.client.impl.ChannelN
    channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    
    // Publish the core of basicPublish
    AMQCommand command = new AMQCommand((new Builder()).exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);
    this.transmit(command);

    
}

Copy the code

2.5 Receiving messages

Receiving messages must be done through the proxy class. Let’s look at the specific proxy class:


// Step 1: SimpleMessageListenerContainer#run
// There is such a code
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
    mainLoop();
}

// Step 2: Keep listening, and trace back to find a section of such code
M- doReceiveAndExecute
try {
    executeListener(channel, message);
}

-> doExecuteListener
-> invokeListener(channel, message);
-> this.proxy.invokeListener(channel, data) // At this point, you can't actually see the actual handler class

// Debug all the way to the end of this method
protected void actualInvokeListener(Channel channel, Object data) {
    Object listener = getMessageListener();
    if (listener instanceof ChannelAwareMessageListener) {
        doInvokeListener((ChannelAwareMessageListener) listener, channel, data);
    }else if (listener instanceof MessageListener) {
			boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
			if (bindChannel) {
				RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
				resourceHolder.setSynchronizedWithTransaction(true);
				TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
						resourceHolder);
			}
			try {
				doInvokeListener((MessageListener) listener, data);
			}
			finally {
				if (bindChannel) {
					// unbind if we bound
					TransactionSynchronizationManager.unbindResource(this.getConnectionFactory()); }}}else if(listener ! =null) {
			throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
					+ listener);
		}
		else {
			throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'"); }}Copy the code

2.6 Message Listening

In the RabbitMQ provides two message listener: SimpleMessageListenerContainer (SMLC) and DirectMessageListenerContainer (DMLC), they has the following properties:

  • AckTimeout: After messagesPerAck is set, this value is used as an ack alternative and will be the time since the last ACK to ensure authenticity
  • acknowledgeMode: There are three types: NONE, MANUAL, and AUTO
    • NONE: No ACK is sent
    • MANUAL: The listener must confirm all messages by calling channel.basicack ()
    • AUTO: The container automatically acknowledges the message unless the MessageListener throws an exception
  • AdviceChain: An ARRAY of AOP advice that should be used for listener execution
  • MessagePostProcessor afterReceivePostProcessors: before calling the listener call instance array
  • AlwaysRequeueWithTxManagerRollback: true said in a configuration when the transaction manager always query message again when rolled back
  • autoDeclare : When set to true (the default), use RabbitAdmin to re-declare all AMQP objects (queues, switches, bindings) if the container detects that at least one queue is lost during startup (possibly because it was automatically deleted or expired), but if the queue is lost for any reason, The restatement will proceed.
  • AutoStartup: indicates that the container should start when ApplicationContext starts. The default is true
  • BatchSize: When used with Annexemode.auto, the container attempts to batch process this amount of messages before sending them
  • BatchingStrategy: Batch processing strategy
  • ChannelTransacted: Whether all messages are confirmed in the transaction (manually or automatically)
  • Concurrency: Concurrency consumer scope for each listener
  • ConcurrentConsumers: the number of concurrentConsumers initially started per listener
  • ConnectionFactory: A reference to a connectionFactory
  • ConsecutiveActiveTrigger: Minimum number of consecutive messages (less than this number will not send or receive timeout)
  • ConsumerBatchEnabled: Enables batch messages
  • ConsumerStartTimeout: The time, in milliseconds, to wait for the user thread to start
  • ConsumerTagStrategy: consumerTagStrategy implementation
  • ConsumersPerQueue: The number of users created for each configured queue
  • ErrorHandler: errorHandler reference
  • Exclusive: Whether a single consumer in this container has exclusive access to the queue
  • MessagesPerAck: indicates the number of messages to be received between acks
  • MaxConcurrentConsumers: The maximum number of concurrent consumers to start as needed
  • NoLocal: Set to true to disable the delivery of messages published on the same channel connection from the server to the consumer
  • RabbitAdmin: implemented by rabbitAdmin
  • ReceiveTimeout: Maximum wait time for each message
  • ShutdownTimeout: indicates the shutdownTimeout period
  • StartConsumerMinInterval: The minimum interval between starting a consumer
  • StopConsumerMinInterval: disables the minimum consumer interval
  • TransactionManager: transaction management

The function of the listener

From this we can probably know the common functions of listeners:

  • Listening queues (multiple queues)
  • Automatic startup
  • Automatic declaration
  • Set transaction properties, transaction manager, transaction attributes, transaction concurrency, start transaction, message rollback
  • Set the number of consumers, maximum and minimum quantities, and batch consumption attributes
  • Set confirm and ACK modules, whether to return to the queue, errorHandler
  • Set the consumer tag generation policy, exclusive mode, and consumer attributes
  • Set the specific converter, conversion, etc
  • And other common features

DirectMessageListenerContainer

  • Each consumer of each queue uses a separate channel
  • Concurrency is controlled by the rabbitClient
// Configuration mode
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("queueName1"."queueName2");
        container.setMessageListener((MessageListener) message -> {
           //....
        });
        return container;
}
Copy the code

From the source code to see how to use:

TODO

3. Go deep

3.1 configure SSL

By configuring RabbitConnectionFactoryBean userSSL to configure SSL related attributes, the configuration should be paid more attention to the following properties:

  • private Resource sslPropertiesLocation;
  • private String keyStore;
  • private String trustStore;

See the official document [email protected]. IO /spring-amqp…

3.2 Configuring a Cluster

AddressShuffleMode. RANDOM said RANDOM set the connection sequence, the default is one polling

  • NONE: Do not change the address before or after opening the connection; Try to join in a fixed order
  • RANDOM: Shuffle the address randomly before opening the connection; Try the join in a new order
  • INORDER: Shuffle addresses after opening a connection, moving the first address to the last.
@Bean
public CachingConnectionFactory ccf(a) {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.RANDOM);
    return ccf;
}


/ / Step 1: this Address will be set to the Address attribute Also will try to set up internal publisherConnectionFactory
Address[] addressArray = Address.parseAddresses(addresses);
this.addresses = new LinkedList<>(Arrays.asList(addressArray));
this.publisherConnectionFactory.setAddresses(addresses);

// Step 2: Mode judgment
private synchronized com.rabbitmq.client.Connection connectAddresses(String connectionName)
			throws IOException, TimeoutException {

    List<Address> addressesToConnect = new ArrayList<>(this.addresses);
    
    // RANDOM mode processing
    if (addressesToConnect.size() > 1 && AddressShuffleMode.RANDOM.equals(this.addressShuffleMode)) {
        Collections.shuffle(addressesToConnect);
    }

    com.rabbitmq.client.Connection connection = this.rabbitConnectionFactory.newConnection(this.executorService,
				addressesToConnect, connectionName);
    / / INORDER mode
    if (addressesToConnect.size() > 1 && AddressShuffleMode.INORDER.equals(this.addressShuffleMode)) {
        this.addresses.add(this.addresses.remove(0));
    }
    return connection;
}


/ / Step 3: in com. The rabbitmq. Client. NewConnection ConnectionFactory


Copy the code

3.3 Configuring the Connection Factory

public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) { SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost); rabbitTemplate.convertAndSend(payload); SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory()); }}Copy the code

3.4 Configuring common RabbitTemplate Functions

3.4.1 Retry function

RetryTemplate is not exclusive, Rabbit he is org. Springframework. Retry. Support tool in the package

@Bean
public RabbitTemplate rabbitTemplate(a) {

    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    
    / / using RetryTemplate, this class is org. Springframework. Retry. Support classes
    RetryTemplate retryTemplate = new RetryTemplate();
    
    / / the fallback function, belongs to class org. Springframework. Retry. Backoff
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    
    // Retry + rollback
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    
    return template;
}

Copy the code

Using Callback


retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            returnrabbitTemplate.convertAndSend(exchange, routingKey, message); }},new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null; }}); }Copy the code

The source code to see



Copy the code

3.4.2 Configuring Exception Detection

This is not recommended in the official documentation, as transactions are very resource-intensive

// Start the transaction first
template.setChannelTransacted(true);

// Then check for exceptions in the transaction method
txCommit()


Copy the code

3.4.3 Setting a callback

  • RabbitTemplate supports only one ConfirmCallback
template.setConfirmCallback(new MySelfConfirmCallback());

// ConfirmCallback implements the class
public class MySelfConfirmCallback implements RabbitTemplate.ConfirmCallback {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("------> CorrelationData is :{} <-------", correlationData); }}// CorrelationData is the object provided by the client when the original message is sent
// ack : ture(ack) / false (nack)

// CorrelationData provides the Future interface user to obtain data asynchronously
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
cd1.getFuture().get(10, TimeUnit.SECONDS).isAck();

Copy the code

Deep source

public SettableListenableFuture<Confirm> getFuture(a) {
    return this.future;
}

C- SettableListenableFuture
    - private final SettableTask<T> settableTask = newSettableTask<>() ? - Contains a SettableTask object with a Thread in itprivate static class SettableTask<T> extends ListenableFutureTask<T> {
    @Nullable
    private volatile Thread completingThread;
}


    
C- Confirm
    - private final boolean ack;
    - private final String reason;

Copy the code

3.4.5 Settings Return

  • Set the CachingConnectionFactor #publisherReturns to true
  • RabbitTemplate set ReturnsCallback
template.setReturnsCallback(new SelfReturnsCallback());

/ / set ReturnCallBack
public class SelfReturnsCallback implements RabbitTemplate.ReturnsCallback {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void returnedMessage(ReturnedMessage returned) {}}Copy the code

ReturnedMessage contains the following attributes:

  • Message: The returned message itself
  • ReplyCode: Indicates the return reason
  • ReplyText: Indicates the return reason
  • Exchange: The exchange to which messages are sent
  • RoutingKey: the routingKey used

Each RabbitTemplate supports only one ReturnsCallback.

3.4.6 Independent Connection

By setting the UseBlisherConnection property, you can use a different connection from the one used by the listener container, if possible

This way avoids the consumer being blocked when the producer is blocked for any reason

template.setUsePublisherConnection(true);

// The connection factory maintains a second internal connection factory for this
By default, it is the same type as the main factory, but you can set an explicit type if you want to publish with a different factory type.

// Source code:
M- doSendAndReceiveWithDirect
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
    connectionFactory = connectionFactory.getPublisherConnectionFactory();
}


Copy the code

3.5 the Event Event

  • AsyncConsumerStartedEvent: consumer is started
  • Restart the consumer after AsyncConsumerRestartedEvent: failure
  • AsyncConsumerTerminatedEvent: consumers to stop
  • AsyncConsumerStoppedEvent: consumers to stop (SimpleMessageListenerContainer use)
  • ConsumeOkEvent: When consumeOk is received from the agent
  • ListenerContainerIdleEvent :
  • MissingQueueEvent: Lost Queue

3.6 Configuring the MESSAGE Converter


// This is the name of the Bean
@RabbitListener(... , messageConverter = "jsonConverter")

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(a) {
    SimpleRabbitListenerContainerFactory factory = newSimpleRabbitListenerContainerFactory(); . factory.setMessageConverter(newJackson2JsonMessageConverter()); .return factory;
}

// PS : 
@Bean
public DefaultConversionService myConversionService(a) {
        DefaultConversionService conv = new DefaultConversionService();
        conv.addConverter(mySpecialConverter());
        return conv;
}


Copy the code

3.7 configuration BatchListener

BatchListerner is used to receive the entire batch in one call

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(a) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

// Receiving mode
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    this.amqpMessagesReceived = amqpMessages;
    this.batch1Latch.countDown();
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    this.messagingMessagesReceived = messages;
    this.batch2Latch.countDown();
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    this.batch3Strings = strings;
    this.batch3Latch.countDown();
}

Copy the code

3.8 Message transformation and serialization

public RabbitTemplate getRabbitTemplate(a){
    RabbitTemplate template = new RabbitTemplate();
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}


@Bean
public Jackson2JsonMessageConverter jsonMessageConverter(a) {
    Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
    jsonConverter.setClassMapper(classMapper());
    return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper(a) {
    DefaultClassMapper classMapper = newDefaultClassMapper(); Map<String, Class<? >> idClassMapping =new HashMap<>();
    idClassMapping.put("thing1", Thing1.class);
    idClassMapping.put("thing2", Thing2.class);
    classMapper.setIdClassMapping(idClassMapping);
    return classMapper;
}

Copy the code

3.9 Transaction Management for Rabbit MQ

Transaction management by channelTransacted SimpleMessageListenerContainer properties to control, when set to True, the framework USES the transaction channel, and through committed or rolled back (depending on the results) over all operations (sent or received), At the same time there is an abnormal signal to send a rollback

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(a) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }
    
/ / transactionManager mainly RabbitTransactionManager, its implementation class AbstractPlatformTransactionManager
    

Copy the code

3.10 Pre – and post-processing

  • RabbitTemplate
    • setBeforePublishPostProcessors()
    • setAfterReceivePostProcessors()
  • SimpleMessageListenerContainer
    • setAfterReceivePostProcessors()
    public RabbitTemplate getRabbitTemplate1(a){
        RabbitTemplate template = new RabbitTemplate();
        template.setAfterReceivePostProcessors();
        template.setBeforePublishPostProcessors();
        return template;
    }

Copy the code

3.11 Automatic Recovery

When the connection is re-established, RabbitAdmin reclaims any infrastructure beans (queues and others), so it does not rely on the automatic recovery that the AMQp-Client library now provides

C - com. The rabbitmq. Client. The ConnectionFactory M - setAutomaticRecoveryEnabled: client automatically restore the switchCopy the code

3.12 Retry mode

The RabbitTemplate configuration retries are mentioned earlier, so here are some other retries:

Batch retry

Batch retry based on MessageBatchRecoverer implementation class

3.13 Multiple agents

Multi-broker means declaring multiple sets of infrastructure (connection factories, administrators, container factories)


// Node 1: set multiple factories
@Bean
CachingConnectionFactory cf1(a) {
    return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2(a) {
    return new CachingConnectionFactory("otherHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,CachingConnectionFactory cf2) {
    SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
    rcf.setDefaultTargetConnectionFactory(cf1);
    rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2));
    return rcf;
}


// Node 2: set multiple Admin accounts

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
    return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
    return new RabbitAdmin(cf2);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry(a) {
    return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
    MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
                = new MultiRabbitListenerAnnotationBeanPostProcessor();
    postProcessor.setEndpointRegistry(registry);
    postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
    return postProcessor;
}


// Node 3: ContainFacotory

 @Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf1);
    return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf2);
    return factory;
}

// Node Other

 @Bean
RabbitTemplate template(RoutingConnectionFactory rcf) {
    return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
    return new ConnectionFactoryContextWrapper(rcf);
}

    
    

Copy the code

4. Class association

  • SimpleMessageListenerContainer
  • RabbitAdmin

TODO: To be perfected

5. Solutions

5.1 Cluster Solutions and Efficiency issues

Communication efficiency problem

RabbitMQ transmits data over virtual connection-channels over TCP. The official definition of a Channel is a lightweight connection that shares a single TCP connection. PS: It is not advisable to have multiple TCP connections open at the same time, as doing so consumes system resources and makes configuring the firewall more difficult

Channel concept:

Every protocol operation performed by the client takes place on the channel. Communication on a particular channel is completely independent of communication on the other channel, so each protocol method carries a channel ID (aka channel number) that is used by both the broker and the client to determine which channel the method is used on

A channel exists only in the context of a connection and never in isolation. When a connection is closed, all channels on the connection are also closed.

Life cycle of the channel:

The application opens the channel immediately after successfully opening the connection.

ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();
Channel ch = conn.createChannel();
ch.close();
Copy the code

Just like connections, channels mean long life. That is, there is no need to open a channel for each operation, which is very inefficient because opening a channel is a network round trip.

The client library provides a way to observe and respond to channel exceptions. For example, in a Java client, there is a way to register an error handler and access the channel closure (closure) cause.

Cluster type and deployment mode

1. In common cluster mode

  • Integration mode: Nodes are deployed on multiple servers. Each node holds queue metadata or an instance. When accessing a node that has only queue metadata, data is pulled from the node that has the instance
  • Increase throughput
  • The high availability

2. Mirror cluster mode

  • Integration mode: Multiple servers deploy nodes separately. One node synchronizes data to other nodes, so each node holds all message data
  • High availability
  • High performance overhead

Please refer to this map: juejin.cn/editor/draf…

5.2 Message Loss

There are usually three types of loss:

  1. The message was lost during sending and the consumer did not get the message
  2. Consumers received the message, not enough time to consume
  3. The consumer consumed the message, but the consumption did not complete successfully

The message is lost in the process of sending, and the consumer does not get the message

RabbitMQ is the sender’s acknowledgment. When the Channel is set to Confirm, all messages are assigned a ChannelID. When the consumption is complete, the id of the successful consumption is returned to the producer

An nack message is returned when an internal error occurs

void confirm(CorrelationData correlationData, boolean ack, String cause); CorrelationData: Object ACK provided by the client when the original message is sent:false -> nack (not ack )
Copy the code

The sender acknowledgement pattern is asynchronous and allows the producer application to continue sending the message while waiting for the acknowledgement. PS: RabbitMQ provides transaction functionality

Problem two: the consumer receives the message, has not had time to consume

Often this is because the message is stored in memory and cannot be consumed. The best solution is persistence

  1. Queue Settings are not persisted when created
  2. When a message is sent, the message is set to persist

Problem 3: The consumer consumes the message, but the consumption does not complete successfully

ACK is processed by the consumer through the business layer

Conclusion: Most reliability problems can be solved by keeping messages persistent and ACK confirming messages at the business level

5.3 Repeated consumption

During message production, MQ internally generates an inner-msg-ID for each message sent by the producer, which serves as the basis for deduplicity and idempotent (message delivery fails and is retransmitted) to avoid repeated messages entering the queue

When consuming a message, it is required that there must be a bizId (globally unique for the same business, such as payment ID, order ID, post ID, etc.) in the message body as the basis for deduplicity and idempotent, so as to avoid repeated consumption of the same message.

Message distribution: If at least one consumer subscribing to the queue, messages are sent to consumers in a round-robin manner.

5.4 Message sequence fault

When RabbitMQ has more than one consumer, the ack mechanism is in place but the consumption is not blocked. As a result, the performance of each node is different and the efficiency of the consumption is different. In this case, the faster node is more likely to overconsume

When the ladder borrow @ www.processon.com/view/5c5bf6…

Solution: 1. Multiple queues for each Consumer 2. Single Queue and Consumer

5.5 Consumption Failure

  1. Ensure message persistence
  2. Consumption failure handling and callback (ErrorHandler)

5.6 Consumption current restriction

RabbitMQ provides a quality of Service (qos) feature: without automatic message acknowledgment, new messages will not be consumed until a certain number of messages have been consumed

  • PrefetchSize: specifies the size limit of a single message, usually 0
  • PrefetchCount: Tells RabbitMQ not to push more than N messages at a time to a consumer until the consumer ack
  • Global: true\false Whether to apply the above Settings to a channel, i.e. whether the above Settings are channel level or consumption level, usually fals

Offers the following method in AbstractMessageListenerContainer function

6. Performance indicators

TODO

conclusion

I felt that the document did not achieve the desired effect. I expected to find the point of customization through the official document + source source, but this effect was not achieved after writing it down. The main reason is that the Rabbit system is too large, and the official document is very long, so it is difficult to find the breakthrough point. The whole article there are many, many did not talk about the source code is not very deep

After a few days of decision and it dead knock, see is to open a single chapter or in this chapter in-depth, wish a smooth!

Update log:

20210408: Plan completion class diagram