preface
At present, the mainstream messaging-oriented middleware includes ActivemQ, RabbitMQ, RocketMQ and Kafka. We need to select a suitable messaging-oriented middleware according to the actual business scenarios. The main indicators we pay attention to include the reliability of message delivery, maintainability, throughput and the characteristics of middleware. Big data is definitely Kafka, so the traditional business scenario is decoupling, asynchronous, peak shaving. If you choose one of the three products, for throughput, community activity, and message reliability, the average small to medium sized business might be better off with RabbitMQ. So let’s see how to use it.
Environment to prepare
This case is based on springboot rabbitMQ integration, this case mainly focuses on the actual code, for basic theoretical knowledge please baidu. JDK -version: 1.8 rabbitmq-version: 3.7 Springboot -version: 2.1.4.RELEASE
- Pom file
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
- Yml configuration file
Spring: rabbitmq: password: guest username: guest port: 5672 addresses: 127.0.0.1Failed to enable send
publisher-returns: true
# Enable send confirmation
publisher-confirms: true
listener:
simple:
# specify the minimum number of consumers.
concurrency: 2
# specify the maximum number of consumers.
max-concurrency: 2
# open an ack
acknowledge-mode: auto
# open an ack
direct:
acknowledge-mode: auto
Support acknowledgement and return of messages
template:
mandatory: true
Copy the code
Configure the rabbitMq position
- Pose a
Based on the javaconfig
package com.lly.order.message; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @className RabbitMqConfig * @description rabbitMq configuration * @author lly * @date 2019-05-13 15:05 * @version 1.0 **/ @Configuration public class RabbitMqConfig { public final static String DIRECT_QUEUE ="directQueue";
public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";
public final static String TOPIC_EXCHANGE = "topic_exchange";
public final static String FANOUT_EXCHANGE = "fanout_exchange";
public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
public final static String TOPIC_ROUTINGKEY_TWO = "*.key"; @bean public Queue @bean public QueuedirectQueue() {
return new Queue(DIRECT_QUEUE, true); } // Topic subscriber pattern Queue @bean public QueuetopicQueueOne() {
return new Queue(TOPIC_QUEUE_ONE, true);
}
@Bean
public Queue topicQueueTwo() {
return new Queue(TOPIC_QUEUE_TWO, true); } // fanout broadcast mode Queue @bean public QueuefanoutQueueOne() {
return new Queue(FANOUT_QUEUE_ONE, true);
}
@Bean
public Queue fanoutQueueTwo() {
return new Queue(FANOUT_QUEUE_TWO, true); } // topic exchange @bean public TopicExchangetopExchange() {
returnnew TopicExchange(TOPIC_EXCHANGE); } // Fanout exchange @bean public FanoutExchangefanoutExchange() {
returnnew FanoutExchange(FANOUT_EXCHANGE); } // Subscriber pattern Binding @bean public BindingtopExchangeBingingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
}
@Bean
public Binding topicExchangeBingingTwo() {
returnBindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO); } // Broadcast mode Binding @bean public BindingfanoutExchangeBingingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutExchangeBingingTwo() {
returnBindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); }}Copy the code
- Position 2
Based on the annotation
package com.lly.order.message; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalTime; import java.util.UUID; /** * @className MQTest * @description message queue test * @author lly * @date 2019-05-13 10:50 * @version 1.0 **/ @component @Slf4j public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private final static String QUEUE ="test_queue";
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public MQTest(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
public void sendMq() {
rabbitTemplate.convertAndSend("test_queue"."test_queue" + LocalTime.now());
log.info("Send message :{}"."test_queue" + LocalTime.now());
}
public void sendMqRabbit() {// callback id CorrelationData cId = new CorrelationData(uid.randomuuid ().toString()); // rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE,""."Broadcaster Mode Test",cId);
Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, ""."Broadcaster Mode Test", cId);
log.info("Send message :{},object:{}"."Broadcaster Mode Test"+ LocalTime.now(), object); } // Send subscriber mode public voidsendMqExchange() {
CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
log.info("Subscriber Mode -> Send message :routing_key_one");
rabbitTemplate.convertSendAndReceive("topic_exchange"."routing_key_one"."routing_key_one" + LocalTime.now(), cId);
log.info("Subscriber Mode -> Send message routing_key_two");
rabbitTemplate.convertSendAndReceive("topic_exchange"."routing_key_two"."routing_key_two"+ LocalTime.now(), cId01); } @rabbitListener (queuesToDeclare = @queue (queuesToDeclare = @queue ("test_queue"))
public void receiverMq(String msg) {
log.info("Queue message received :{}", msg); } // If they don't exist, automatically create queues and switches and bind @rabbitListener (bindings = {@queuebinding (value = @queue (value =))"topic_queue01", durable = "true"),
exchange = @Exchange(value = "topic_exchange".type = ExchangeTypes.TOPIC),
key = "routing_key_one")})
public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("Topic_routing_key_one message received :{}", msg); // Log. Error ("Abnormal"); int i = 1 / 0; BasicAck (deliveryTag, channel.basicack (deliveryTag, channel.basicack))false);
} catch (Exception e) {
log.error("Failed to receive message, put back to queue"); / / requeu fortrue// Channel. BasicNack (deliveryTag,false.true); // Channel. BasicReject (deliveryTag,true);
}
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
exchange = @Exchange(value = "topic_exchange".type = ExchangeTypes.TOPIC),
key = "routing_key_two")})
public void receiverMqExchageTwo(String msg) {
log.info("Topic_routing_key_two message received :{}", msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("Queue FANout_queue_ONE message received :{}", msg);
channel.basicAck(deliveryTag, false); } catch (Exception e) { e.printStackTrace(); / / many failures back again will cause the clogging in the queue or dead circulation problems Discard this message / / channel. BasicNack (message. GetMessageProperties () getDeliveryTag (),false.false);
log.error("Failed to receive message");
}
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
public void receiverMqFanoutTwo(String msg) {
log.info("Queue FANout_queue_two message received :{}", msg); } / * * * @return* @author lly * @description Verify whether the message is sent to exchange * @date 2019-05-14 15:36 * @param [correlationData, ack, cause] **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("Message unique Id :{}", correlationData);
log.info("Message confirmed!");
log.error("Message failure cause:{}", cause); } / * * * @return* @date 2019-05-14 16:22 * @param [message, replyCode, replyText, exchange, routingKey] **/ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("Message sending failed ID :{}", message.getMessageProperties().getCorrelationId());
log.info("Message body message:", message);
log.info("Message body message:", replyCode);
log.info("Description:" + replyText);
log.info("Exchange used by the message:", exchange);
log.info("Routing key used for message routing:", routingKey); }}Copy the code
Three ways to confirm rabbitMq messages
Confirm the message directly after sending it
acknowledge-mode:none
Based on the consumption of messages, intelligently determine the confirmation of messages
acknowledge-mode:auto
Confirm message manually
acknowledge-mode:manual
Copy the code
We test the ACK of the message in topic mode
channel.basicAck(deliveryTag, false);
Copy the code
Restart the project and consume the message again
Consume the message in the case of an exception, modify the code to simulate what happens in the case of an exception, the exception occurs, the message is requeued
conclusion
Through the actual code we know rabbitMQ in the project specific integration, message ack several situations, convenient in the actual scenario to choose the appropriate solution to use. If there are shortcomings, but also hope to give advice.