preface

At present, the mainstream messaging-oriented middleware includes ActivemQ, RabbitMQ, RocketMQ and Kafka. We need to select a suitable messaging-oriented middleware according to the actual business scenarios. The main indicators we pay attention to include the reliability of message delivery, maintainability, throughput and the characteristics of middleware. Big data is definitely Kafka, so the traditional business scenario is decoupling, asynchronous, peak shaving. If you choose one of the three products, for throughput, community activity, and message reliability, the average small to medium sized business might be better off with RabbitMQ. So let’s see how to use it.

Environment to prepare

This case is based on springboot rabbitMQ integration, this case mainly focuses on the actual code, for basic theoretical knowledge please baidu. JDK -version: 1.8 rabbitmq-version: 3.7 Springboot -version: 2.1.4.RELEASE

  • Pom file
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
  • Yml configuration file
Spring: rabbitmq: password: guest username: guest port: 5672 addresses: 127.0.0.1Failed to enable send
    publisher-returns: true
    # Enable send confirmation
    publisher-confirms: true
    listener:
      simple:
        # specify the minimum number of consumers.
        concurrency: 2
        # specify the maximum number of consumers.
        max-concurrency: 2
        # open an ack
        acknowledge-mode: auto
      # open an ack
      direct:
        acknowledge-mode: auto
    Support acknowledgement and return of messages
    template:
      mandatory: true
Copy the code

Configure the rabbitMq position

  • Pose a

Based on the javaconfig

package com.lly.order.message; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @className RabbitMqConfig * @description rabbitMq configuration * @author lly * @date 2019-05-13 15:05 * @version 1.0 **/ @Configuration public class RabbitMqConfig { public final static String DIRECT_QUEUE ="directQueue";
    public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
    public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
    public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
    public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";

    public final static String TOPIC_EXCHANGE = "topic_exchange";
    public final static String FANOUT_EXCHANGE = "fanout_exchange";

    public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
    public final static String TOPIC_ROUTINGKEY_TWO = "*.key"; @bean public Queue @bean public QueuedirectQueue() {
        return new Queue(DIRECT_QUEUE, true); } // Topic subscriber pattern Queue @bean public QueuetopicQueueOne() {
        return new Queue(TOPIC_QUEUE_ONE, true);
    }
    @Bean
    public Queue topicQueueTwo() {
        return new Queue(TOPIC_QUEUE_TWO, true); } // fanout broadcast mode Queue @bean public QueuefanoutQueueOne() {
        return new Queue(FANOUT_QUEUE_ONE, true);
    }
    @Bean
    public Queue fanoutQueueTwo() {
        return new Queue(FANOUT_QUEUE_TWO, true); } // topic exchange @bean public TopicExchangetopExchange() {
        returnnew TopicExchange(TOPIC_EXCHANGE); } // Fanout exchange @bean public FanoutExchangefanoutExchange() {
        returnnew FanoutExchange(FANOUT_EXCHANGE); } // Subscriber pattern Binding @bean public BindingtopExchangeBingingOne() {
        return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
    }

    @Bean
    public Binding topicExchangeBingingTwo() {
        returnBindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO); } // Broadcast mode Binding @bean public BindingfanoutExchangeBingingOne() {
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutExchangeBingingTwo() {
        returnBindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); }}Copy the code
  • Position 2

Based on the annotation

package com.lly.order.message; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalTime; import java.util.UUID; /** * @className MQTest * @description message queue test * @author lly * @date 2019-05-13 10:50 * @version 1.0 **/ @component @Slf4j public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private final static String QUEUE ="test_queue";

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public MQTest(RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void sendMq() {
        rabbitTemplate.convertAndSend("test_queue"."test_queue" + LocalTime.now());
        log.info("Send message :{}"."test_queue" + LocalTime.now());
    }


    public void sendMqRabbit() {// callback id CorrelationData cId = new CorrelationData(uid.randomuuid ().toString()); // rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE,""."Broadcaster Mode Test",cId);
        Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, ""."Broadcaster Mode Test", cId);
        log.info("Send message :{},object:{}"."Broadcaster Mode Test"+ LocalTime.now(), object); } // Send subscriber mode public voidsendMqExchange() {
        CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
        CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
        log.info("Subscriber Mode -> Send message :routing_key_one");
        rabbitTemplate.convertSendAndReceive("topic_exchange"."routing_key_one"."routing_key_one" + LocalTime.now(), cId);
        log.info("Subscriber Mode -> Send message routing_key_two");
        rabbitTemplate.convertSendAndReceive("topic_exchange"."routing_key_two"."routing_key_two"+ LocalTime.now(), cId01); } @rabbitListener (queuesToDeclare = @queue (queuesToDeclare = @queue ("test_queue"))
    public void receiverMq(String msg) {
        log.info("Queue message received :{}", msg); } // If they don't exist, automatically create queues and switches and bind @rabbitListener (bindings = {@queuebinding (value = @queue (value =))"topic_queue01", durable = "true"),
                    exchange = @Exchange(value = "topic_exchange".type = ExchangeTypes.TOPIC),
                    key = "routing_key_one")})
    public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            log.info("Topic_routing_key_one message received :{}", msg); // Log. Error ("Abnormal"); int i = 1 / 0; BasicAck (deliveryTag, channel.basicack (deliveryTag, channel.basicack))false);
        } catch (Exception e) {
            log.error("Failed to receive message, put back to queue"); / / requeu fortrue// Channel. BasicNack (deliveryTag,false.true); // Channel. BasicReject (deliveryTag,true);
        }
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
                    exchange = @Exchange(value = "topic_exchange".type = ExchangeTypes.TOPIC),
                    key = "routing_key_two")})
    public void receiverMqExchageTwo(String msg) {
        log.info("Topic_routing_key_two message received :{}", msg);
    }


    @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
    public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("Queue FANout_queue_ONE message received :{}", msg);
            channel.basicAck(deliveryTag, false); } catch (Exception e) { e.printStackTrace(); / / many failures back again will cause the clogging in the queue or dead circulation problems Discard this message / / channel. BasicNack (message. GetMessageProperties () getDeliveryTag (),false.false);
            log.error("Failed to receive message");
        }
    }

    @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
    public void receiverMqFanoutTwo(String msg) {
        log.info("Queue FANout_queue_two message received :{}", msg); } / * * * @return* @author lly * @description Verify whether the message is sent to exchange * @date 2019-05-14 15:36 * @param [correlationData, ack, cause] **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("Message unique Id :{}", correlationData);
        log.info("Message confirmed!");
        log.error("Message failure cause:{}", cause); } / * * * @return* @date 2019-05-14 16:22 * @param [message, replyCode, replyText, exchange, routingKey] **/ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("Message sending failed ID :{}", message.getMessageProperties().getCorrelationId());
        log.info("Message body message:", message);
        log.info("Message body message:", replyCode);
        log.info("Description:" + replyText);
        log.info("Exchange used by the message:", exchange);
        log.info("Routing key used for message routing:", routingKey); }}Copy the code

Three ways to confirm rabbitMq messages

Confirm the message directly after sending it
acknowledge-mode:none
Based on the consumption of messages, intelligently determine the confirmation of messages
acknowledge-mode:auto
Confirm message manually
acknowledge-mode:manual
Copy the code

We test the ACK of the message in topic mode

channel.basicAck(deliveryTag, false);
Copy the code

Restart the project and consume the message again

Consume the message in the case of an exception, modify the code to simulate what happens in the case of an exception, the exception occurs, the message is requeued

conclusion

Through the actual code we know rabbitMQ in the project specific integration, message ack several situations, convenient in the actual scenario to choose the appropriate solution to use. If there are shortcomings, but also hope to give advice.