Use the RabbitMQ
Hello! Welcome to Java growth notes, mainly for mutual communication, mutual learning, also hope to share can help everyone, if there is any mistake, hope to correct, thank you!
Introduction of the RabbitMQ
RabbitMQ is open source message broker software (also known as message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). RabbitMQ servers are written in the Erlang language, while clustering and failover are built on top of the open telecom platform framework. All major programming languages have client libraries that communicate with proxy interfaces. Erlang was originally used in the switch domain as an architectural pattern, which makes RabbitMQ very good at communicating data between brokers. Erlang has the same latency as native sockets. This is why RabbitMQ is so high performance. RabbitMQ is developed and supported by Rabbit Technologies LTD. Originally a joint venture between LSHIFT and CohesiveFT formed in 2007, Rabbit Technologies was acquired by VMware’s SpringSource in April 2010. RabbitMQ became part of GoPivotal in May 2013.
Common messaging middleware protocols
AMQP AMQP is an application-layer standard and Advanced Message Queuing Protocol (AMQP) that provides unified messaging services. It is an open application-layer standard designed for message-oriented middleware. The client and message-oriented middleware based on this protocol can transmit messages, regardless of different products of client/middleware and different development languages. Advantages: Reliable and versatile. MQTT protocol MQTT (Message Queuing Telemetry Transport) is an instant messaging protocol developed by IBM, which may become an important part of the Internet of Things. The protocol supports all platforms, connects almost any connected object to the outside world, and is used as a communication protocol for sensors and actuators (such as connecting homes via Twitter). Advantages: simple format, small bandwidth, mobile communication, PUSH, embedded system. Streaming Text Orientated Message Protocol (STOMP) is a Message Oriented Middleware Protocol for MOM. Message-oriented middleware) designed as a simple text protocol. STOMP provides an interoperable connection format that allows clients to interact with any STOMP message Broker. Advantages: Command mode (not Topic \ Queue mode). XMPP XMPP is based on the Extensible Markup Language (XML) and is mainly used for INSTANT Messaging (IM) and online field detection. It is suitable for quasi-real-time operation between servers. The core is xmL-based streaming, a protocol that could eventually allow Internet users to send instant messages to anyone else on the Internet, even if their operating systems and browsers are different. Advantages: General public, strong compatibility, extensible, high security, but XML encoding format takes up a large bandwidth. Other TCP/ IP-based customized protocols Some special frameworks (such as Redis, Kafka, and zeroMq) do not strictly follow the MQ specifications. Instead, they encapsulate a set of protocols based on TCP/IP and transmit them through network sockets to implement the MQ function.
Benefits of messaging middleware
1. Asynchronous decoupling involves the loose coupling of upstream and downstream businesses. Even if the downstream subsystems are unavailable or down, the normal operation of the core trading system will not be affected. 2. Peak clipping is the best way to solve the problem of flow pulses brought by activities such as grabbing red packets and killing seconds, which may lead to system overload or even crash due to lack of corresponding protection, or may affect user experience due to a large number of failed requests due to excessive restrictions. 3, support distributed deployment, can ensure the efficient and reliable message delivery, achieve high concurrency, high availability, high performance, reasonable use to break through the performance bottleneck.
Introduction to common message-oriented middleware
The RabbitMQ installation
Note that RabbitMQ must be compatible with Erlang. The RabbitMQ version used in this article is rabbitmq-server-generic-UNIx-38.3.tar. xz and the Erlang version is otp_src_22.2.tar
RabbitMQ download address
1, RabbitMQ official website address 2, Erlang download address 3, RabbitMQ Chinese document
RabbitMQ installation and common commands
$wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm $sudo RPM -uvh Install $sudo yum install epel-release $sudo yum install Erlang Rabbitmqctl start_app 4, stop rabbitmqctl stop_app 5, check the node status rabbitmqctl status 6, plugin management rabbitmq-pluginsenableHTP :/127.0.0.1:15672 8. Add user rabbitmqctl add_user username password 9 List_users 10, delete user rabbitmqctl delete_user username 13. Change the password rabbitmqctl change_password username newpassword. 14 Rabbitmqctl set_permissions-p vhostpath username.*". *"". *"Create rabbitmactl add_vhost vhostpath for rabbitmqctl list_vhosts 8, Delete rabbitmqctl delete_vhost vhostPATH. 8, Run the following command to query information about queues: rabbitmqctl list_queues Rabbitmqctl -o vhostPath purge_queue blue Rabbitmqctl join_cluster< clusterNode >[--ram Rabbitmqctl cluster_status 24. Change the storage format of cluster nodes to rabbitmqctl change_cluster_node_type disc ram 25 Forget_cluster_node [--offline] 26. Change the node name rabbitmqctl rename_cluster_node oldNode1 newNode1 [oldNode2][newnode2...Copy the code
RabbitMQ common nouns explained
1. Server: also called Broker, accepts the connection of clients and realizes AMQP entity service. 2. Connection: The network Connection between an application and a Broker. 3. Channel: network Channel. Almost all operations are carried out in Channels, which are Channels for reading and writing messages. A client can establish multiple channels, each representing a session task. 4. Message: Data sent between the Message, server, and application, consisting of Properties and Body. Properties allows you to modify messages with advanced features such as message priority and latency. The Body is the message Body content. 5, Virtual host: Virtual address, used for logical isolation, the highest level of message routing. A VirtualHost can have several exchanges and queues. A VirtualHost cannot have exchanges or queues with the same name. 6. Exchange: a switch that receives messages and forwards them to the bound queue according to the routing key. Binding: Virtual link between exchanges and queues. Binding can contain routing keys. Routing key: A Routing rule that a virtual machine can use to determine how to route a particular message. Queue: Also known as Message Queue, a Message Queue that stores messages and forwards them to consumers. Connectionfactory: get the Connectionfactory.
RabbitMQ is used in basic mode
Basic switch properties Name: indicates the switch Name. Type: switch Type direct, Topic, FANout, or headers. They want to be persistent or not, true for persistent false for not persistent. Auto Delete: When the last queue bound to an Exchange is deleted, the Exchange is automatically deleted. Internal: Whether Exchange is currently used within Rabbitmqp. The default is False. Arguments: Extension parameters for extending custom use of the AMQP protocol.
Introduction of depend on
1. Maven dependencies to be introduced
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
2. Basic use
// Consumer code
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
@Slf4j
public class Consumer {
public static void main(String[] args) throws Exception {
// create a Connectionfactory and configure it
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitVo.getHost());
connectionFactory.setPort(rabbitVo.getPort());
connectionFactory.setUsername(rabbitVo.getUserName());
connectionFactory.setPassword(rabbitVo.getPassWord());
connectionFactory.setVirtualHost(rabbitVo.getVirtualHost());
// 2, through the connection factory build connection
final Connection connection = connectionFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// create a queue to persist the server restart queue
final String queueName = "e-rabbitmq";
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(queueName, true.false.false.null);
// create a consumer
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("Message content :" + newString(body)); }};// 6、设置Channel
channel.basicConsume(queueName, true, defaultConsumer);
log.error("Message received successfully -->"+ System.currentTimeMillis()); }}// Production code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
@Slf4j
public class Procuder {
public static void main(String[] args) throws Exception {
// create a Connectionfactory and configure it
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitVo.getHost());
connectionFactory.setPort(rabbitVo.getPort());
connectionFactory.setUsername(rabbitVo.getUserName());
connectionFactory.setPassword(rabbitVo.getPassWord());
connectionFactory.setVirtualHost(rabbitVo.getVirtualHost());
// 2, through the connection factory build connection
final Connection connection = connectionFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// Send data through channe1
try {
for(int i=0; i<5; i++){final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(""."e-rabbitmq".null, msg.getBytes()); }}finally {
// make sure to close the connection
channel.close();
connection.close();
}
log.error("Message sent successfully -->+ System.currentTimeMillis()); }}Copy the code
Note that the message producer does not specify the Exchange, only the queue name, and the message consumer will process the message according to the AMQP default mode, which is the queue with the same name as the Routingkey. As shown below:
Exchange three types of use
Main API Introduction
// exchange: the name of the message exchange
// type: indicates the Exchange type
// durable: Whether to persist True: Yes False: No
// autoDelete: true indicates that when the last queue bound to an Exchange is deleted, the Exchange is automatically deleted
// internal: Specifies whether Exchange is currently used within RabbitMQ. The default value is False
// arguments: extension arguments for extending the custom use of the AMQP protocol
// Declare a switch
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
// queue: queue name
// durable: Whether to persist True: Yes False: No
// EXCLUSIVE: Set to exclusive and only one queue can be used
// Create a queue
DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// queue: queue name
// exchagne: indicates the route name
// routingKey: route key
// Establish routing, routingKey, and queue bindings
BindOk queueBind(String queue, String exchange, String routingKey)
// BasicProperties Object properties
Map<String,Object> map = ImmutableMap.of("msg1"."msg1"."msg2"."msg2");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 1. Not persistent 2
.contentEncoding("UTF-8") // Set the character set
.expiration("10000") // no consumption is automatically removed for 10s
.headers(map) //
.build();
// Get the message information
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
log.error("Message content :" + new String(body)+ "BasicProperties property :" + JSON.toJSONString(properties));
Copy the code
Direct Exchange using
Direct mode: All Exchange messages sent to Direct are forwarded to the Queue specified in Routekey. Note: The Direct mode can use the default Exchange provided by RabbitMQ, so you do not need to bind the Exchange. When transmitting a message, Routekey must match exactly. Otherwise, the message will be discarded. The code is shown below:
// Consumer code
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerDirect {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connection
final Channel channel = connection.createChannel();
// Declare a switch
channel.exchangeDeclare(MqConfig.directExChange, MqTypeEnum.MQ_DIRECT.getType(), true.false.false.null);
// Whether to persist
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(MqConfig.directQueue, true.false.false.null);
// Create a binding relationship
channel.queueBind(MqConfig.directQueue, MqConfig.directExChange, MqConfig.directRoutingKey);
// 3. Create consumers
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("Message content :" + newString(body)); }};// 4、设置Channel
channel.basicConsume(MqConfig.directQueue, true, defaultConsumer);
log.error("Message received successfully -->"+ System.currentTimeMillis()); }}// Production code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderDirect {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// 3. Send data through channe1
try {
for(int i=0; i<5; i++){final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.directExChange, MqConfig.directRoutingKey, null, msg.getBytes()); }}finally {
// Close the connection
channel.close();
connection.close();
}
log.error("Message sent successfully -->+ System.currentTimeMillis()); }}Copy the code
Topic Exchange using
Topic pattern: Messages sent from the production end are fuzzy matched with Exchange, RouteKey, and Topic. For example, the matching symbol in RouteKey, the symbol “#” matches one or more words, and the symbol “” matches no more than one word. Example: “10g.#” can match “log.info.aa “, “log.” will only match “log.error “. The code is shown below:
// Consumer code
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerTopic {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connection
final Channel channel = connection.createChannel();
try {
Optional.ofNullable(channel).ifPresent(x->{
// Declare a switch
x.exchangeDeclare(MqConfig.topicExChange, MqTypeEnum.MQ_TOPIC.getType(), true.false.false.null);
// Whether to persist
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
x.queueDeclare(MqConfig.topicQueue, true.false.false.null);
// Create a binding relationship
x.queueBind(MqConfig.topicQueue, MqConfig.topicExChange, MqConfig.topicRoutingKey);
});
} catch (Exception e) {
throw new RuntimeException("Channel connection failed!");
}
// 3. Create consumers
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("Message content :" + newString(body)); }};// 4、设置Channel
channel.basicConsume(MqConfig.topicQueue, true, defaultConsumer);
log.error("Message received successfully -->"+ System.currentTimeMillis()); }}// Production code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderTopic {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// 3. Send data through channe1
try {
for(int i=0; i<5; i++){final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
final String routingkey = MqConfig.topicProcuderRoutingKey + UUID.randomUUID().toString().replaceAll("-"."");
channel.basicPublish(MqConfig.topicExChange, routingkey, null, msg.getBytes()); }}finally {
// Close the connection
channel.close();
connection.close();
}
log.error("Message sent successfully -->+ System.currentTimeMillis()); }}Copy the code
The Fanout Exchange using
Fanout mode: Does not handle routing keys and simply binds the queue to the switch. Messages sent to the switch are forwarded to all queues bound to the switch, and the Fanout switch forwards messages fastest. The code is shown below:
// Consumer code
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerFanout {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connection
final Channel channel = connection.createChannel();
Optional.ofNullable(channel).ifPresent(x->{
try {
// Declare a switch
x.exchangeDeclare(MqConfig.fanoutExChange, MqTypeEnum.MQ_FANOUT.getType(), true.false.false.null);
// Whether to persist
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
x.queueDeclare(MqConfig.fanoutQueue, true.false.false.null);
// Create a binding relationship
x.queueBind(MqConfig.fanoutQueue, MqConfig.fanoutExChange, MqConfig.fanoutRoutingKey);
} catch(IOException e) { e.printStackTrace(); }});// 3. Create consumers
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("Message content :" + newString(body)); }};// 4、设置Channel
channel.basicConsume(MqConfig.fanoutQueue, true, defaultConsumer);
log.error("Message received successfully -->"+ System.currentTimeMillis()); }}// Production side code display
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderFanout {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// 3. Send data through channe1
try {
for(int i=0; i<5; i++){final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.fanoutExChange, MqConfig.fanoutRoutingKey, null, msg.getBytes()); }}finally {
// Close the connection
channel.close();
connection.close();
}
log.error("Message sent successfully -->+ System.currentTimeMillis()); }}Copy the code
Message reliability delivery
The basic concept
Message confirmation mechanism for RabbitMQ
Acknowledgement of a message means that if Brokerl receives a message after the producer sends it, it sends us a response. The producer receives a response to determine whether the message was properly sent to the Broker.
Return message mechanism
A Return Listener is used to process messages that are not routable. In some cases, the current Exchange does not exist or the specified routing key is not routable. In this case, a Return Listener is used to listen for messages that are not routable.
Message Reliability Delivery (1)
1. Use the RabbitMQ Confirm message. 2. Before sending the message, the message is stored and the message status is marked. For example: delivery status, update time and so on. 3. Deliver the failed message twice by message status and scheduled task, or retry a single data through the inner management background. PD is shown in the figure below.
Message Reliability Delivery (2)
The first message enters the MQ Broker and the consumer end. If the consumption is successful, a Confirm message will be sent to the Callback service and MSG DB Change will be performed after receiving the message. 2. Send the second delayed message, enter the MQ Broker, and enter the Callback service to query whether the message is successfully consumed. If the message is successfully consumed, no operation is performed, and if the current message status has not changed, and the message consumption fails, the ReSend operation will be notified. 3. The advantage of this process is to save server resources, the entire Callback as a single service unified management, convenient maintenance. The specific flow chart is shown in the figure below.
Traffic limiting on the consumption end
RabbitMQ provides a quality of service (qos) function that does not consume new messages until a certain number of messages have been confirmed (by setting qos values based on consumer or channel) without automatic confirmation.
// prefetchSize: 0 indicates that the message size is not limited
// prefetchCount: Do not push more than N messages to a consumer at the same time. That is, if N messages have not been ack, the consumer will block until it ack
// global: Is traffic limiting at channel level or consumers level
void Basicqos(uint prefetchsize, ushort prefetchcount, bool global)
Copy the code
// Consumer code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.custom.CustomConsumer;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerAck {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connection
final Channel channel = connection.createChannel();
Optional.ofNullable(channel).ifPresent(x->{
try {
// Declare a switch
x.exchangeDeclare(MqConfig.ackExChange, MqTypeEnum.MQ_TOPIC.getType(), true.false.false.null);
// Whether to persist
x.queueDeclare(MqConfig.ackQueue, true.false.false.null);
// Create a binding relationship
x.queueBind(MqConfig.ackQueue, MqConfig.ackExChange, MqConfig.ackRoutingKey);
} catch(IOException e) { e.printStackTrace(); }});// 3. Current limiting
// int prefetchSize, int prefetchCount, boolean global
channel.basicQos(0.1.false);
// 4、设置Channel String queue, boolean autoAck, Consumer callback
// autoAck true Automatic packet collection false Manual packet collection
channel.basicConsume(MqConfig.ackQueue, false.new AckConsumer(channel));
log.error("Message consumption success -->"+ System.currentTimeMillis()); }}// Production code
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderAck {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// 3. Send data through channe1
for(int i=0; i<5; i++){final String msg = "Hello RabbitMq";
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
final String routingkey = MqConfig.ackProcuderRoutingKey + UUID.randomUUID().toString().replaceAll("-"."");
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.ackExChange, routingkey, true, properties, msg.getBytes());
}
log.error("Message production successful"+ System.currentTimeMillis()); }}Copy the code
Dead-letter queue
Dead Letter queue: introduction to DLX, dead-letter-exchange
When a message becomes a dead message in one queue, it can be republished to another Exchange, the DLX.
DLX is also a normal Exchange, no different from normal Exchange, it can be specified on any queue, in effect > set a queue attribute. When there is a dead letter in the queue, RabbitMQ will automatically re-publish the message to the Exchange and route it to another queue.
TTL queue/message introduction
TTL is short for Time To Livel. RabbitMQ supports setting the expiration Time of a message, which can be specified when the message is sent. It is calculated from the Time the message enters the queue.
There are several ways a message can become a dead-letter
Reject (basic.reject/basic.nack) and requeue=false. 2. The TTL of the message expires
// Consumer code
import com.google.common.collect.Maps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerDlx {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connection
final Channel channel = connection.createChannel();
Map<String, Object> agruments = Maps.newHashMap();
agruments.put("x-dead-letter-exchange", MqConfig.dlxExChange);
// dlx test
channel.exchangeDeclare(MqConfig.dlxTestExChange, MqTypeEnum.MQ_TOPIC.getType(), true.false.false.null);
// Whether to persist
channel.queueDeclare(MqConfig.dlxTestQueue, true.false.false, agruments);
// Create a binding relationship
channel.queueBind(MqConfig.dlxTestQueue, MqConfig.dlxTestExChange, MqConfig.dlxTestRoutingKey);
/ / statements DLX
channel.exchangeDeclare(MqConfig.dlxExChange, MqTypeEnum.MQ_TOPIC.getType(), true.false.false.null);
// Whether DLX is persistent
channel.queueDeclare(MqConfig.dlxQueue, true.false.false.null);
// DLX establishes a binding relationship
channel.queueBind(MqConfig.dlxQueue, MqConfig.dlxExChange, MqConfig.dlxRoutingKey);
channel.basicConsume(MqConfig.dlxTestQueue, false.new DlxConsumer(channel));
log.error("Message received successfully -->"+ System.currentTimeMillis()); }}// Production code
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderDlx {
public static void main(String[] args) throws Exception {
// 1. Build a connection through the connection factory
final Connection connection = mqFactory.newConnection();
// create Channe1 with connectioni
final Channel channel = connection.createChannel();
// 3. Send data through channe1
for(int i=0; i<5; i++){final String msg = "Hello RabbitMq";
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(MqConfig.dlxTestExChange, MqConfig.dlxTestProcuderRoutingKey, true, properties, msg.getBytes());
}
log.error("Message sent successfully -->+ System.currentTimeMillis()); }}Copy the code
The end of this chapter, will continue to update, share Java growth notes, I hope we can grow together. If you find my share useful, please remember to like and follow! This is the best encouragement for me. Thank you very much! PS: Reprint please indicate the source!