Abstract

This integration example incorporates all seven working modes, as well as message signature, reject signature, scheduled task, and so on. The focus is on the configuration of the XML file. The RabbitMQ schema diagram above can help you understand better.

The common dependency is pom.xml

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.1.7. RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8. RELEASE</version>
    </dependency>
    
    <! The following two dependencies are required by non-integration and required by testing
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.1.7. RELEASE</version>
    </dependency>
Copy the code

Public configuration rabbitmq.properties

rabbitmq.host=192.168.168.10
rabbitmq.port=5672
rabbitmq.username=zheng123
rabbitmq.password=zheng123
rabbitmq.virtual-host=/zheng
Copy the code

Seven working modes and message signing, refuse to sign

producers


      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! Load configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"/>
/>
    <! -- Define management switch, queue
    <rabbit:admin connection-factory="connectionFactory"/>

    <! -- Define a persistent queue. If it does not exist, it will be created automatically. If it is not bound to a switch, it is bound to the default switch. The default switch type is direct, the name is "", and the routing key is the queue name.
    <! -- ID: bean name name: queue name Auto-declare: automatic creation auto-delete: automatic deletion. The durable queue is automatically deleted after the last consumer disconnects from the queue.
    <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

    <! -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ broadcast; All queues can receive messages ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <! Define the broadcast type switch; And bind the above two queues -->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1"/>
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>


    <! -- Define queue -->
    <rabbit:queue id="spring_direct_queue" name="spring_direct_queue" auto-declare="true"/>

    <! Routing -->
    <rabbit:direct-exchange name="spring_direct_exchange">
        <rabbit:bindings>
            <! Key: route key queue: queue name -->
            <rabbit:binding queue="spring_direct_queue" key="info"></rabbit:binding>
        </rabbit:bindings>

    </rabbit:direct-exchange>

    <! -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ wildcard; * Match one word, # match multiple words ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    <! -- Define a persistent queue in the broadcast switch. If it does not exist, it will be created automatically.
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <! Declare a topic switch -->
    <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="baiqi.*" queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="baiqi.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <! The rabbitTemplate object operation can be used to send messages easily.
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
Copy the code
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // Simple mode & work queue mode
    @Test
    public void testHelloWorld(a){
        rabbitTemplate.convertAndSend("spring_queue"."hello world spring....");
    }
    
    // Publish subscribe mode
    @Test
    public void testFanout(a){
        rabbitTemplate.convertAndSend("spring_fanout_exchange".""."spring fanout....");
    }
    
    // Routing mode
    @Test
    public void testDirect(a){
        rabbitTemplate.convertAndSend("spring_direct_exchange"."info"."spring Direct....");
    }
    
    // Theme mode
    @Test
    public void testTopics(a) throws InterruptedException {

        // Confirm required in XML 
      
        open publishable - Confirms ="true"
      
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                // confirmed
            } else {
                // nack-ed}});// return is required in XML 
      
        open publisher-returns="true"
      
        // Set the mode for the switch to process failed messages to true. If the message cannot reach the queue, the message is returned to the producer
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            // return message
        });

        rabbitTemplate.convertAndSend("spring_topic_exchange"."baiq3i.h3ehe.3haha"."spring topic 9....");

        TimeUnit.SECONDS.sleep(5000);

    }

Copy the code

consumers


      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! Load configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <bean id="springQueueListener" class="com.baiqi.rabbitmq.listener.SpringQueueListener"/>
    <bean id="fanoutListener1" class="com.baiqi.rabbitmq.listener.FanoutListener"/>
    <bean id="fanoutListener2" class="com.baiqi.rabbitmq.listener.FanoutListener2"/>
    
    <! Each listener listens to one queue. Acknowledge ="manual" indicates manual message acceptance auto-declare="true" Indicates automatic switch creation -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
    </rabbit:listener-container>
</beans>
Copy the code
public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        // message.getBody() todosomething}}@Component
public class NormalListener implements ChannelAwareMessageListener {

    // Acknowledge ="manual" in 
      
       . Otherwise, the message is signed by default
      
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        / / message id
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // message.getBody() todosomething

            // Sign the message
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {

            // Refuse to sign for it
            // Third parameter: requeue: requeue. If set to true, the message is returned to the queue and the broker resends the message to the consumer
            channel.basicNack(deliveryTag, true.true); }}@Override
    public void onMessage(Message message) {
        // message.getBody() todosomething}}Copy the code

Timer (delay queue)

TTL + delay queue + dead letter queue to achieve timing operation

The following configuration needs to understand the above figure. After a message is sent to the Queue through the normal Exchange below, a TTL is added. If the message is not consumed by Consumer1 within the TTL time, it is sent as a dead letterDLX(Dead Letter Exchange)Enter the dead-letter queue and send to Consumer2. There are two groups of brokersExchange/QueueAll are normal components, so the key to implementing timed operations is to connect the bottom Queue to the top DLX.

The following configurations are on the production end and do not need to be modified on the consumer end.


      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <! Load configuration file -->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <! Rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"/>
    <! -- Define management switch, queue
    <rabbit:admin connection-factory="connectionFactory"/>

    <! The rabbitTemplate object operation can be used to send messages easily.
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <! -- 1. Define normal exchange (order_exchange) and queue (order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
         <! Set the normal queue expiration time to 30 minutes -->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />

        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <! --2. Define dead-letter exchange (order_exchange_dlx) and queue (order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

</beans>
Copy the code
@Test public void testDlx() throws InterruptedException { rabbitTemplate.convertAndSend("order_exchange", "order.2", "dlx msg ...." ); }Copy the code

Note: In addition to TTL timeout, there are two cases in which a message becomes dead letter: The length of the queue message reaches the limit; Consumers reject consumption messages and do not requeue.