This is the 10th day of my participation in Gwen Challenge
Relevant concepts
Page after startup
Connections: Connection is a RabbitMQ socket Connection that encapsulates some of the socket protocol logic
Channels are virtual connections built inside of real TCP connections. AMQP commands are sent over channels, and each channel is assigned a unique ID. One TCP connection corresponds to multiple channels, which is theoretically unlimited. This reduces the cost of TCP creation and destruction and achieves the effect of sharing TCP
Queues: RabbitMQ internal objects that store messages
Messags: Ready: the state is not being monitored. It is in a queue. Unacked: the state is being monitored but is not being consumed
total:ready+unacked
Confirm message acknowledgement mechanism, return message mechanism, and message acknowledgement consumption notification
Confirm message: Indicates whether a return message is successfully sent to the corresponding queue of the corresponding switch. A return Listener is used to process messages that are not routable. When sending a message, the exchange does not exist or the specified routingkey is not routingkey
Springboot use:
The configuration application. Yml
Spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # Open Confirm and return publisher- Confirms: confirm True publisher-returns: true # Enable manual confirmation listener: simple # None means that no reply will be sent. Manual means that listeners must notify all messages by calling channel.basicack (). Auto means that the container will answer automatically unless MessageListener throws an exception, which is the default configuration. Acknowledge -mode: manual # This configuration item determines whether a message rejected due to an exception thrown by a listener is put back into the queue. The default value is true. Default-requeue - Rejected # Enable the retry times and retry mechanism. Retry: max-attempts: 3 Enabled: trueCopy the code
The Sender. Java implements Confirm,Return
@RestController public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("/send1") public void send1() { String context = "hello " + new Date(); System.out.println("Sender : " + context); / / mandatory, return true message not sent successfully, false automatically delete messages rabbitTemplate. SetMandatory (true); rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("exchange","topic.message", context); } // @PostMapping("/send2") // public void send2() { // String context = "world " + new Date(); // System.out.println("Sender : " + context); // this.rabbitTemplate.convertAndSend("topic.messages", context); // } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {system.out.println (" message sent successfully "); } else {system.out. println(" send message failed :" + cause); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("returnMessage:"+message + i + s + s1 + s2); }}Copy the code
Consumer Receiver. Java confirms consumption
@rabbitListener (queues="topic.message") @rabbitListener (queues="topic.message") throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); System.out.println("message:"+str); }Copy the code
Processing return message (failed to route)
Add the Return unrouted configuration
@Bean public Queue unRouteQueue() { return new Queue("queue-unroute"); } @Bean public Exchange exchange() { Map<String, Object> arguments = new HashMap<>(4); // When a message is sent to exchange-Rabbit-Springboot-advance,routingKey and bindingKey do not match. Arguments. Put ("alternate-exchange", "exchange-unroute"); return new DirectExchange("exchange", true, false, arguments); } @bean public FanoutExchange unRouteExchange() {// The name of the exchange must be the same as the alternate-exchange parameter in exchange() return new FanoutExchange("exchange-unroute"); } @Bean public Binding unRouteBinding() { return BindingBuilder.bind(unRouteQueue()).to(unRouteExchange()); }Copy the code
Dead-letter queue
Description: 1. A message is rejected and the queue is not rejoined requeue=false 2. The queue length reaches the maximum 3. The TTL of the message expires
The process by which a message enters a dead-letter queue is: message -> queue (triggering the above condition) -> DLX switch -> DLK queue
configuration
@Configuration public class RabbitmqConfig { @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build(); } @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); Args. Put ("x-dead-letter-exchange", "DL_EXCHANGE"); // x-dead-letter-routing-key args. Put ("x-dead-letter-routing-key", "KEY_R"); return QueueBuilder.durable("DL_QUEUE").withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable("REDIRECT_QUEUE").build(); } /** * The dead letter route is bound to the dead letter queue by the DL_KEY binding key. ** @return the binding */ @bean public binding deadLetterBinding() {return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null); } /** * The dead-letter route is bound to the dead-letter queue via the KEY_R binding key Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null); }}Copy the code
producers
String context = "hello " + new Date(); System.out.println("Sender : " + context); MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); / / set encoding messageProperties setContentEncoding (" utf-8 "); / / set the expiration time 10. * 1000 milliseconds messageProperties setExpiration (" 5000 "); return message; }; rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", context, messagePostProcessor);Copy the code
The consumer listens for the redirected queue
Delay queue
Messages in the queue do not need to be consumed immediately, but wait for some time before being retrieved. Such as order, reservation, service failure retry, etc. There are generally two implementations
-
Dead letter queue, set message time, as above
-
Rabbitmq_delayed_message_exchange rabbitmq_delayed_message_exchange rabbitmq_delayed_message_exchange rabbitmq_delayed_message_exchange Rabbitmq-plugins enable rabbitmq_delayed_message_exchange restart the service
configuration
@bean public Queue immediateQueue() {// The first parameter is the name of the Queue to be created, Return new Queue(" delayQueue ", true); } @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange("DELAYED_EXCHANGE", "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify() { return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with("DELAY_ROUTING_KEY").noargs(); }Copy the code
The producer Sender modifies the message delay method
messageProperties.setDelay(5000);
Copy the code