RabbitMQ is an open source message broker and queue server for sharing data between completely different applications over a common protocol. RabbitMQ is written in the Erlang language and is based on the AMQP protocol.
The cause of RabbitMQ high performance
Use the Erlang language
The Erlang language was originally built as an architectural model for the switch domain, which makes RabbitMQ performance excellent for data interaction between brokers
Advantages of Erlang :Erlang has the same latency as native sockets
AMQP
Advanced Message Queuing Protocol
The definition of it
A binary protocol with modern features. Message queue protocol is an application layer standard that provides unified message service. It is an open standard of application layer protocol. It is designed for message-oriented middleware.
AMQP protocol model
AMQP core concepts
Server/BRoker: Accepts connections from clients to implement AMQP entity services
Commection: To connect, apply the network connection to the Broker
Channel: Network Channel in which almost all operations take place. A Channel is a Channel for reading and writing messages. A client can establish multiple channels, each representing a session task.
Message: A Message, data sent between the server and the application, consisting of Properties and Body. Properties allows you to modify messages with advanced characteristics, such as priority, latency, and so on. The Body is the content of the message Body
Virtual host: Virtual address used for logical isolation, the top layer of message routing. A Virtual host can contain multiple exchanges and queues. A Virtual host cannot contain exchanges or queues with the same name.
Exchange: a switch that receives messages and forwards them to a bound queue based on the routing key.
Binding: Virtual link between exchanges and queues. Binding can contain routing keys
Routing key: a Routing rule that a virtual machine can use to determine how to route a particular message
Queue: Message Queue, which stores messages and delivers them to consumers for consumption.
The RabbitMQ architecture
The RabbitMQ installation
Use RabbitMQ 3.6.5:
-
Environment building:
-
Official website: www.rabbitmq.com/
-
Environment: Linux (centos7 Redhat7)
## 1. First, do some software preparation work on Linux, yum down some basic packages
um install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
/etc/hosts /etc/hostname
# 2. Download the required RabbitMQ package.
get www.rabbitmq.com/releases/erlang/erlang18.3- 1.el7.centos.x86_64.rpm
get http://repo.iotti.biz/CentOS/7/x86_64/socat- 1.7.3.2- 1.1.el7.lux.x86_64.rpm
get www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server- 3.6.5- 1.noarch.rpm
# 3. Install service command
pm -ivh erlang18.3- 1.el7.centos.x86_64.rpm
pm -ivh socat- 1.7.3.2- 1.1.el7.x86_64.rpm
pm -ivh rabbitmq-server- 3.6.5- 1.noarch.rpm
# 4. Modify user login and connection heartbeat detection
im /usr/lib/rabbitmq/lib/rabbitmq_server- 3.6.5Ebin/rabbit. GaiDian app1: << in loopback_users"guest">> to reserve only guest (for user login) change points2: the heartbeat is10(For heartbeat connection)# 5. Install the admin plug-in
# 5.1 first startup services (behind | contains the stop command, check the state and restart)
etc/init.d/rabbitmq-server start | stop | status | restart
Lsof -i:5672 (5672 is the default Rabbit port)
abbitmq-plugins enable rabbitmq_management
# 5.3 Check whether the management port is enabled:
sof -i:15672Or netstat-tnlp | grep 15672
Enter username and password as guest:
# http://your IP address :15672/
# 7. If all goes well, that's it, our environment has been installed
Copy the code
The RabbitMQ core
Exchange
Direct Exchange
Direct Exchange – Processes routing keys. The need to bind a queue to the switch requires that the message exactly match a particular routing key. This is a complete match. If a queue bound to the switch requires the routing key “dog”, only messages marked “dog” are forwarded, not dog.puppy or dog.guard, but only dog.
Any messages sent to the Direct Exchange will be forwarded to the Queue specified in RouteKey.
1. Generally, you can use the default Exchange provided with rabbitMQ. The Exchange name is an empty string.
2. This mode does not require any Exchange binding
3. A RouteKey is required for message transmission, which can be simply understood as the name of the queue to be sent.
4. If the queue name specified in RouteKey does not exist in vhost, the message is discarded.
case
producers
public class DirectExchangeSender {
public static void main(String[] args) throws Exception{
// 1 Create a ConnectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.123.171");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2. Generate a connection
Connection connection=connectionFactory.newConnection();
/ / 3. Create a channel
Channel channel=connection.createChannel();
/ / 4
String exchangeName = "test_direct_exchange";
String routingKey = "test_direct_routingKey";
// Parameters: queue name, persistent, exclusive queue (for this connection only), automatically deleted when not in use, other parameters
channel.queueDeclare(routingKey,false.false.false.null);
String msg="Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null, msg.getBytes()); }}Copy the code
consumers
public class DirectExchangeReceiver {
public static void main(String[] args) throws Exception {
// 1 Create a ConnectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.123.171");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection=connectionFactory.newConnection();
Channel channel = connection.createChannel();
/ / 4
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
// Parameters: queue name, automatic ACK, Consumer
channel.exchangeDeclare(exchangeName, exchangeType, true.false.false.null);
channel.queueDeclare(queueName, false.false.false.null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable Whether to maintain persistent messages
QueueingConsumer consumer = new QueueingConsumer(channel);
// Parameters: queue name, automatic ACK, Consumer
channel.basicConsume(queueName, true, consumer);
while (true){
QueueingConsumer.Delivery delivery= consumer.nextDelivery();
String msg =newString(delivery.getBody()); System.out.println(msg); }}}Copy the code
Topic Exchange
Topic Exchange – Matches routing keys to a pattern. At this point the queue needs to be bound to a pattern. The symbol “#” matches one or more words, and the symbol “**” matches no more than one word. So “dog.#” will match “dog.jacquesh.boy”, but “dog.*” will only match “dog.jacquesh”.
Any message sent to a Topic Exchange will be forwarded to all queues that care about the Topic specified in RouteKey
-
In simple terms, each queue has a topic of interest, all messages have a “RouteKey”, and Exchange will forward the message to all queues whose topic of interest vaguely matches the RouteKey.
-
This pattern requires RouteKey, perhaps pre-binding Exchange and Queue.
-
When binding, provide a topic that the queue cares about, such as “#.log.#” to indicate that the queue cares about all log-involved messages (a message whose RouteKey is “mq.log.error” will be forwarded to the queue).
-
# indicates zero or multiple keywords, and ** indicates one keyword. For example, log.* matches log.warn, but not log.warn. Timeout. But “log.#” matches both.
-
Similarly, if Exchange does not find a Queue that matches RouteKey, the message is discarded.
case
producers
public class TopicExchangeSender {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.123.171");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
/ / 5 to send
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
//basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
channel.basicPublish(exchangeName,routingKey1,false.false.null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey2,false.false.null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey3,false.false.null,msg.getBytes()); channel.close(); connection.close(); }}Copy the code
consumers
public class TopicExchangeReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.123.171");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
/ / 4
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.#";
/ / declare the Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true.false.false.null);
// Declare a queue
channel.queueDeclare(queueName, false.false.false.null);
// Queue binding
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true){
QueueingConsumer.Delivery delivery =consumer.nextDelivery();
System.out.println(newString(delivery.getBody())); }}}Copy the code
Fanout Exchange
Fanout Exchange – Does not handle routing keys. You simply bind the queue to the switch. A message sent to a switch is forwarded to all queues bound to the switch. Much like subnet broadcasting, each host in a subnet gets a copy of the message. The Fanout switch is the fastest to forward messages.
Any messages sent to the Fanout Exchange will be forwarded to all queues bound to that Exchange.
-
You can think of it as a routing table schema
-
This pattern does not require a RouteKey
-
This mode requires that exchanges and queues be bound in advance. An Exchange can be bound to multiple queues, and a Queue can be bound to multiple exchanges.
-
If the Exchange receiving the message is not bound to any Queue, the message is discarded.
case
producers
public class FanoutExchangeSender {
public static void main(String[] args) throws Exception {
/ / create the connectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.123.171");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
String msg="test Fanout";
String exchange="fanout_exchange";
channel.basicPublish(exchange,"".null,msg.getBytes()); }}Copy the code
consumers
public class FanoutExchangeRecevier {
public static void main(String[] args) throws Exception {
/ / create the connectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.123.171");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
String exchange="fanout_exchange";
String exchangeType = "fanout";
String queueName = "fanout_queue";
//String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
channel.exchangeDeclare(exchange, exchangeType, true.false.false.null);
//String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(queueName,true.false.false.null);
channel.queueBind(queueName, exchange, "");
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true){
QueueingConsumer.Delivery delivery= queueingConsumer.nextDelivery();
System.out.println(newString(delivery.getBody())); }}}Copy the code
Producer reliability delivery
Persistent scheme
STEP1, STEP 2: Persist the sent message before sending it
STEP 3: Send messages to the Broker
The Broker sends a message to the consumer.
STEP 5: Change the message status after receiving the message
STEP 6, STEP 7, STEP 8: Periodically resends the message whose status is not updated
Comfirm mechanism
Comfirm mechanism means that after a producer sends a message, if the Broker receives the message, it will send a reply to the producer
The producer receives and responds to determine whether a message is properly sent to the Broker, which is the core guarantee for reliable delivery of messages
// Enable confirmSelect before sending messages
channel.confirmSelect();
// Add confirm listener
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("Sent successfully");
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("Send failed"); }});// Send a message
channel.basicPublish("", queueName , null , msg.getBytes());
Copy the code
The Return mechanism
In some cases, if the exchange does not exist or the specified routing key is not available, we need to use a Return Listener to listen for unreachable messages.
There is one key configuration item in the base API: Mandatory: If true, a message that is unreachable is received by the listener and subsequent processing is performed.
If it is false, the broker automatically deletes the message!
// Add a return listener
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1,
String s2, AMQP.BasicProperties basicProperties,
byte[] bytes) throws IOException {
System.out.println("Failed to send:"+newString(bytes)); }});//Mandatory Is set to true
channel.basicPublish(""."bibi" , true ,null, msg.getBytes());
Copy the code
Idempotent solutions for consumers
Business unique ID or fingerprint mechanism, using database primary key for deduplication
SELECT COUNT(1) FROM ORDER WHERE ID= unique service ID or fingerprint codeCopy the code
Consumer’s manual ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false.false);
Copy the code
TTL and dead letter queue
TTL
TTL is short for Time To Live
RabbitMQ supports an expiration time for messages, which can be specified when a message is sent. This time is calculated from the time the message is queued, and the message will be cleared automatically if the time limit is exceeded
Dead-letter queue
The DLX is also a normal Exchange, no different from regular exchanges. It can be specified on any queue, in effect setting the attributes of a queue.
When there is a dead letter in the queue, RabbitMQ will automatically re-publish the message to the Exchange. To be routed to another queue. The ability to listen to messages in this queue for processing compensates for the immediate parameter previously supported by RabbitMQ3.0.
The conditions of the DLX
The message was basic.reject/ basic.nack and requeue=false The TTL message expired queue reached the maximum length
producers
public class Sender4DLXExchange {
public static void main(String[] args) throws Exception {
/ / 1 create ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
/ / 2 to create a Connection
Connection connection = connectionFactory.newConnection();
/ / 3 to create a Channel
Channel channel = connection.createChannel();
/ / 4
String exchangeName = "test_dlx_exchange";
String routingKey = "group.bfxy";
/ / 5 to send
Map<String, Object> headers = new HashMap<String, Object>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// TTL
.expiration("6000")
.headers(headers).build();
String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , props , msg.getBytes()); }}Copy the code
consumers
public class Receiver4DLXtExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 Declare a normal Exchange Queue routing rule
String queueName = "test_dlx_queue";
String exchangeName = "test_dlx_exchange";
String exchangeType = "topic";
String routingKey = "group.*";
/ / declare the exchange
channel.exchangeDeclare(exchangeName, exchangeType, true.false.false.null);
// Notice that arguments have a special attribute here: x-dead-letter-exchange
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange"."dlx.exchange");
//arguments.put("x-dead-letter-routing-key", "dlx.*");
//arguments.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, false.false.false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
//dlx declare:
channel.exchangeDeclare("dlx.exchange", exchangeType, true.false.false.null);
channel.queueDeclare("dlx.queue".false.false.false.null);
channel.queueBind("dlx.queue"."dlx.exchange"."#");
// durable Whether to maintain persistent messages
QueueingConsumer consumer = new QueueingConsumer(channel);
// Parameters: queue name, automatic ACK, Consumer
channel.basicConsume(queueName, true, consumer);
// Loop to get the message
while(true) {// Get the message. If there is no message, this step will block forever
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("Received a message:"+ msg); }}}Copy the code
The RabbitMQ SpringBoot integration
producers
- pom
<! -- springboot rabbitmq(amqp) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
- yaml
Spring. The rabbitmq. Addresses = 192.168.11.71:5672192168:11.72 5672 spring. The rabbitmq. Username = guest Spring. The rabbitmq. Password = guest spring. The rabbitmq. Virtual - host = / spring. The rabbitmq. Connection timeout = 15000 # # use the message to confirm pattern Spring. The rabbitmq. Publisher - confirms = true # # set the return message schema, Attention should cooperate with mandatory use # # spring. The rabbitmq. Publisher - returns = true # # spring. The rabbitmq. Template. Mandatory = true spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULLCopy the code
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/** * This is the callback listener interface for confirming that the message has been received by the broker */
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/ * * *@paramCorrelationData is used as a unique identifier *@paramWhether ack Broker is down successfully *@paramCause Some abnormal information about a failure */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("Message ACK result: + ack + ", correlationData: "+ correlationData.getId()); }};/** * The method of sending messages *@paramMessage Specifies the message content *@paramProperties Additional additional properties *@throws Exception
*/
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = newMessageHeaders(properties); Message<? > msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback);// Specify a unique business iD
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor mpp = new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
throws AmqpException {
System.err.println("---> post to do: " + message);
returnmessage; }}; rabbitTemplate.convertAndSend("exchange-1"."springboot.rabbit", msg, mpp, correlationData); }}Copy the code
consumers
- yaml
Spring. The rabbitmq. Addresses = 192.168.11.71:5672192168 11.72:5672192168 11.71:5673 spring. The rabbitmq. Username = guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 ## After the successful message is consumed, the consumer needs to sign for it manually. The default is auto spring. The rabbitmq. Listener. Simple. Acknowledge - mode = manual spring. The rabbitmq. Listener. Simple. Concurrency = 5 Spring. The rabbitmq. Listener. Simple. Max - concurrency = 10. Spring the rabbitmq. Listener. Simple. The prefetch = 1 # # # # it is best not to write die configuration information in the code, ${} = ${} = ${} = ${} ${spring.rabbitmq.listener.order.exchange.name} spring.rabbitmq.listener.order.exchange.name=order-exchange spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.key=order.* spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULLCopy the code
@Component
public class RabbitReceive {
/** * /**@RabbitListener @QueueBinding @Queue @Exchange
* @param message
* @param channel
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable = "true"), exchange = @Exchange(name = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) )
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
// 1. Consume the message on the business side after receiving it
System.err.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
System.err.println("Consumer News: + message.getPayload());
// 2. Obtain deliveryTag after successful processing and perform manual ACK operation, because manual signature is configured in our configuration file
// spring.rabbitmq.listener.simple.acknowledge-mode=manual
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false); }}Copy the code