What is a message queue
- Message: Data transferred between requests or applications
- Queue: A fifO data structure
- Message queue: Literally a first-in, first-out data structure or container for storing data
In summary: message queue is an asynchronous communication component between services, mainly to solve the application decoupling, asynchronous processing, traffic cutting and other problems, to achieve high performance, high availability, scalability and final consistency.
Usage scenarios for message queues
1. Asynchronous processing
Scenario: At the beginning of the project, the user volume was small, and the user completed the whole process from commodity selection to payment in a single machine. Later, the user volume expanded, adding many other services, such as consumption volume, points and so on.As the number of call links increases, the payment time increases, affecting user experience. Message queuing (MQ) was introduced to improve response speed through asynchronous processing.The payment system only needs to write a payment success message to MQ after the payment is successful. After other systems subscribe, their services are processed asynchronously. The perceived time by the user is: time consumed by the payment system + time consumed by writing to MQ.
2. Apply decoupling
-
Without the introduction of message queue (MQ), the consumption volume deduction, integral deduction and other services in the payment process need to be exposed to the corresponding system interface, so as to be invoked in the payment system. If you need to add an email/SMS system function, you need to modify the payment system and re-publish the application. It also introduces unnecessary maintenance costs.
-
With the introduction of message queuing (MQ), payment systems do not care what systems need to know about payment success. The payment system simply writes a successful payment message to MQ, the other systems subscribe to the message, and when they receive the message, spontaneously conduct their own business.
3, flow peak cutting
Normally, the traffic is low. However, when there is a surge of traffic (for example, in the seckilling scenario), the traffic suddenly increases in a certain period of time. As a result, the service may receive more requests than the service can handle, causing the service to break down and fail to provide services.
In this case, a message queue is introduced. When a large number of requests enter the service, the request is first written to the queue, and then the service pulls messages from MQ for consumption according to its own processing capacity, which achieves the effect of traffic peak-cutting.
RabbitMQ
RabbitMQ is inferior to Kafka and rocketMQ in terms of throughput, but its responsiveness and message reliability are unmatched. It can support applications with a small user base.
1. RabblitMQ working model
- -Serena: Well, I’m not being a producer.
- Connection: TCP connection between producer/consumer and MQ
- Channel: connection Virtual connection established internally
- Broker: The application in MQ that receives and distributes messages
- Virtual Host: a Virtual group for multi-tenant and security purposes. Users in different groups cannot access each other
- Exchange: a switch in which the producer sends messages to the switch, which in turn sends messages to queues
- Queue: a Queue in which messages wait to be consumed by consumers
- Binding: Binds a switch to a queue
- Consumer: The end user of the message
A message producer connects to RabbitMQ through a Connection and then establishes a virtual connection [channel], which sends messages to Exchange. Since Exchange is bound to queue, messages are sent directly through Exchange and stored in queue. Finally, the consumer connects to RabbitMQ through a Connection and then establishes a virtual connection [channel], which fetches and consumes messages from the queue.
2. 5 common working modes for RabbitMQ
- Simple mode (Hello World)
In the figure below, “P” represents the producer of the message, “C” represents the consumer, and the middle box is a queue – a buffer for RabbitMQ to hold messages.
In this mode, one producer, one queue, one consumer (and actually a default exchange that we don’t need to declare)
- Work queue mode
In this mode, there is one producer, one queue, but there are many consumers, and many consumers jointly consume the messages in the queue. In this mode, a message is consumed by only one consumer (there is also a default exchange that we do not need to declare).
- Publish and subscribe model
In the figure below, “X” represents exchange, which needs to be declared manually.
In this mode, there is one producer, one switch, and the switch is bound to multiple queues, with one consumer in each queue. In this model, the message sent by the producer is consumed by each consumer (each consumer receives the same message).
- Routing Matching mode
In this mode, there is one producer, one switch, and the switch is bound to multiple queues, with one consumer in each queue. It is identical to the publish-subscribe mode in structure. The difference is that when the switch binds queues, it sets the Routing key attribute for each queue. When the producer sends a message, it carries the Routing key
- Topic matching pattern
In this mode, there is one producer, one switch, and the switch is bound to multiple queues, with one consumer in each queue. It is very similar to Routing matching mode in structure and principle, but the difference is that in Routing matching mode, Routing key is a fixed value and matching needs to be completed, while Topic matching mode supports fuzzy matching. Topic wildcards: “*” : matches a word; “#” : matches one or more words, ams.* can match ams.insert; Ams.# can match ams.insert. ABC and ams.insert
3. RabbitMQ and SpringBoot integration
The integration code has been uploaded to Github ZhouXH-Z
3.1. Producers
- The jar package introduced
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> Copy the code
- Yml configuration file
Spring: rabbitmq: port: 5672 Username: guest password: guest host: 101.37.10.135 Virtual-host: / template: exchange: xxxxExchange default-receive-queue: xxxxQueue routing-key: xxxx.#Copy the code
- The configuration class
The configuration class is used to create queues in RabbitMQ, create switches, and bind the queues.
Creation time: the first time mq is connected after the project is started.This configuration class is not required if the queue/switch is manually created and bound
@Configuration public class ProducerConfig { @Autowired public BeanFactory beanFactory; @Value("${spring.rabbitmq.template.exchange}") public String exchangeName; @Value("${spring.rabbitmq.template.default-receive-queue}") public String queueName; @Value("${spring.rabbitmq.template.routing-key}") public String routeName; @Bean("exchange") public Exchange buildExchange(a){ return ExchangeBuilder.topicExchange(exchangeName).build(); } @Bean("queue") public Queue buildQueue(a){ return QueueBuilder.durable(queueName).build(); } @Bean("binder") public Binding buildBinder(a){ Queue queue = (Queue) beanFactory.getBean("queue"); Exchange exchange = (Exchange) beanFactory.getBean("exchange"); returnBindingBuilder.bind(queue) .to(exchange).with(routeName).noargs(); }}Copy the code
- Message sending class
@RestController public class ProducerSender { @Autowired RabbitTemplate rabbitTemplate; @Value("${spring.rabbitmq.template.exchange}") public String exchangeName; @Value("${spring.rabbitmq.template.routing-key}") public String routeName; @RequestMapping("/sendMSG") public void sendMSG(a){ rabbitTemplate.convertAndSend(exchangeName,routeName,"testMSG"); }}Copy the code
3.2 Consumers
- Dependencies and profiles are aligned with producers
- Message receiving class
@Component public class Consumer { // ackMode = "MANUAL" If the message needs to be signed manually, set this parameter. Otherwise, it is not required @RabbitListener(queues = "zhouxhQueue",ackMode = "MANUAL") public void consumer(Message ms, Channel channel) throws IOException { // Execute business, pseudo code System.out.println(ms); // Manually sign in channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true); The second Boolean input parameter indicates whether the message returns a pair of columns. False indicates that the message is discarded // If the message is returned to the queue, there is a problem, the message will always be re-consumed, need to manually implement the number of retries, such as the number of retries saved in the cache, for manual judgment //channel.basicNack(ms.getMessageProperties().getDeliveryTag(),true,true);}}Copy the code
4. Set up RebbitMQ high availability cluster
- Set up two stand-alone RabbitMQ machines and start them separately
rabbitmqctl stop_app
Both RabbitMQ nodes stop external servicesRabbitmqctl join_cluster [email protected]
Add the two RabbitMQ nodes to the servicerabbitmqctl start_app
Enable the two RabbitMQ services respectively- through
rabbitmqctl cluster_status
You can view the cluster status of RabbitMQ
5. RabbitMQ advanced features
5.1 How is the reliability of RabbitMQ message delivery guaranteed?
- From producer to switch, Confirm confirmation mode (springboot- RabbitMQ enabled by default)
- When a producer sends a message to Exchange, the producer is notified of the sending status through a Confirm callback.
// Define the callback rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { / * * * *@paramCorrelationData Related configuration information *@paramAck Exchange Whether the switch successfully received the message. True for success, false for failure *@paramCause Cause of failure */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("Confirm method executed...."); // An ACK of true indicates that the message has reached the switch if (ack) { // Received successfully System.out.println("Received success message" + cause); } else { // Failed to receive System.out.println("Failed to receive message" + cause); // Do some processing to get the message sent again.}}});Copy the code
- From switch to queue, return fallback mode (enable rabbitmq.publisher-returns: true)
- After the Exchange receives a message and matches the corresponding queue with the routing key, it sends a return callback to inform the producer of the sending status
// Set the switch's mode for processing failure messages to true. If the message fails to reach the queue, the message is returned to the producer rabbitTemplate.setMandatory(true); // Define the callback rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { / * * * *@paramMessage Message object *@paramReplyCode Error code *@paramReplyText Error message *@paramExchange Switch@paramRoutingKey indicates the routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("Return executes...."); System.out.println("message:"+message); System.out.println("replyCode:"+replyCode); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); / / processing}});Copy the code
- From queue to consumer, sign-in mechanism (on: acknowledge-mode: manual)
- The consumer pulls the message from the queue, and if an exception occurs during message consumption, we can manually reject the message and then pull the message again. Otherwise, sign and receive normally.
// Manually sign in channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true); The second Boolean input parameter indicates whether the message returns a pair of columns // If the message is returned to the queue, there is a problem. The message will always be re-consumed, requiring manual retries, such as saving to the cache channel.basicNack(ms.getMessageProperties().getDeliveryTag(),true.true); Copy the code
- Exchange, Queue, message, etc
5.2, the current limit
In the big push or second kill scenario, the producer sends a large number of messages instantly, which may cause the service to break down due to the insufficient consumption power of consumers. To deal with this instantaneous large flow, the consumer’s flow limit is very necessary.
By specifying a listener factory, you can achieve the effect of limiting traffic
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean(name = "listenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(a){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// Set the number of traffic limits
factory.setPrefetchCount(50);
returnfactory; }}Copy the code
Specify containerFactory in the @rabbitListener annotation
@RabbitListener(queues = "zhouxhQueue",ackMode = "MANUAL",containerFactory = "listenerContainer")
public void consumer(Message ms, Channel channel) throws IOException {
System.out.println(ms);
// Manually sign in
channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true)}Copy the code
5.3. Delay queues
RabbitMQ doesn’t come with a delay queue, but you can use TTL and dead-letter queues to do this.
- TTL: a queue configured with a timeout period. Messages in this queue will be deleted if they have not been consumed by the timeout period
- Dead letter queue: For RabbitMQ this is a dead letter switch, we bind a dead letter switch to a queue, and when a message is dead, it passes through the dead letter switch to a normal queue.
The “DLX” in the figure below is actually a DEAD LETTER EXCHANGE.
Three ways to get into a dead end queue
- The queue is full of messages, and new incoming messages are directly placed in the dead letter queue
- The consumer refuses to sign for it and does not re-queue
- The original queue was set to a timeout period, and the message in the queue reached the timeout period without being consumed
So how does the delay queue work according to these two types of queues? Take a scenario where we shop online, place an order but haven’t paid yet:
- First we place an order in the order system and send the order information to the TTL queue[Assuming that the order is saved for 30 minutes, i.e. the payment is made within 30 minutes, the order is valid, otherwise the order expires]
- We did not pay during the period, so the order information will be deleted from the TTL queue after 30 minutes.
- Orders removed from the TTL queue are sent to the dead-letter queue, and the message is consumed by the inventory system. Determine whether to cancel the order or ship it as normal.
5.4 Message idempotence guarantee (message repeated reading)
Version is carried when the message is sent, and idempotent is guaranteed by an optimistic locking mechanism when the consumer consumes and stores the message in the database.
5.5 message backlog problem
Message backlog problems caused by scenarios such as rush or kill.
- Increase the number of consumers through the work queue mode and increase the speed of message consumption.
- The consumer writes the message to the database, and the business code reads the message from the database for processing