Basic introduction
RabbitMQ is open source message broker software (also known as message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). AMQP (Advanced Message Queuing Protocol) is an open standard of application-layer protocols designed for message-oriented middleware. Message-oriented middleware is mainly used for decoupling between components so that the sender of a message does not need to be aware of the existence of a message consumer.
What problems can be solved
Asynchronous processing Applications Decouple traffic peak elimination log processing…
What similar products are commonly used
Rocket, activeMQ, Kafka
Environment Installation and Configuration
Install the tutorial
This section describes the client interface
Add a user
Virtual hosts is equivalent to the dB in mysql. Authorized users can access the Virtual hosts
Authorize a user
Five kinds of queue
Simple queue
- P is the producer of the message
- The red ones are message queues
- C stands for consumer
A consumer in a simple queue binds to a queue: the producer sends a message to the queue, and the consumer retrieves the message from the queue
Create a Maven project
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1 track</version>
</dependency>
Copy the code
The connection information
public static Connection getConn(a) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("root");
Connection conn = factory.newConnection();
return conn;
}
Copy the code
producers
public final static String QUEUE_NAME = "q_test_01";
public static void main(String[] args) throws IOException {
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();// Create channels from the connection
// Declare a queue
channel.queueDeclare(QUEUE_NAME,false.false.false.null);
// Message content definition
String message = "HelloWorld2!!!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("message" + message);
// Close channels and connections
channel.close();
conn.close();
}
Copy the code
consumers
public static void main(String[] args) throws IOException, InterruptedException {
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(Provider.QUEUE_NAME,false.false.false.null);
// Define queue consumers
QueueingConsumer consumer = new QueueingConsumer(channel);
// Listen to the queue
channel.basicConsume(Provider.QUEUE_NAME, true, consumer);
// Get the message
// Get the message
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(" [x] Received '" + msg + "'"); }}Copy the code
Starting producers and consumers can see that messages sent by producers are consumed by consumers and printed on the console. Disadvantages: high coupling, producer-to-consumer, not if you want multiple consumers to consume messages in the queue
The work queue
The producer sends 100 messages
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();
// Declare a queue
channel.queueDeclare(WorkConsumer1.QUEUE_NAME, false.false.false.null);
for (int i = 0; i < 100; i++) {
// Message content
String message = "" + i;
channel.basicPublish("", WorkConsumer1.QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep( 10);
}
channel.close();
conn.close();
Copy the code
Consumer 1
public final static String QUEUE_NAME = "test_queue_work4";
public static void main(String[] args) throws IOException, InterruptedException {
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false.false.false.null);
channel.basicQos(1);// The server sends only one message to the consumer at a time
QueueingConsumer consumer = new QueueingConsumer(channel);
// Listen on the queue true: automatic false: manual return status
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {// Get the message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(" [1] Received '" + msg + "'");
//Thread.sleep(10);
// Return confirm status, comment out to indicate that automatic confirm mode is used
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}Copy the code
Consumer 2
public static void main(String[] args) throws IOException, InterruptedException {
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false.false.false.null);
channel.basicQos(1);// The server sends only one message to the consumer at a time
QueueingConsumer consumer = new QueueingConsumer(channel);
// Listen on the queue true: automatic false: manual return status
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {// Get the message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(" [2] Received '" + msg + "'");
Thread.sleep(1000);
// Return confirm status, comment out to indicate that automatic confirm mode is used
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}Copy the code
You can then test the results: even though two consumers process messages at different times, they end up consuming the same amount. One consumer consumes an odd number of messages and the other an even number. This default approach, known as polling distribution, has the advantage of working easily in parallel. If we have a backlog of work, we can solve this problem by adding workers (consumers), making the system easier to scale. By default RabbitMQ will send messages one at a time to the next consumer in the sequence (regardless of the duration of each task, etc., and allocated once in advance rather than one at a time). The average consumer gets the same number of messages.
But this is obviously not reasonable in practice because the consumer 1 thread pauses for a short time. It should be that consumer 1 has more information than consumer 2. That is, consumers who are good at processing messages should consume more messages, and fair distribution can be used to solve this problem.
The response determination mechanism for messages needs to be changed
- Automatic acknowledgement Once the message is fetched from the queue, the consumer is considered to have successfully consumed the message regardless of whether it has been successfully consumed after being fetched
- After manually confirming that the consumer has received a message from the queue, the server marks the message as unavailable and waits for feedback from the consumer, which will remain unavailable if it does not.
Modify the above code
// The server sends only one message to the consumer at a time
channel.basicQos(1);
// Listen on the queue, false for manual completion, true for automatic, autoAck = false also avoids message loss, if the consumer processing the message hangs, the message will be delivered to another consumer
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// Open this line after successful message consumption to use manual confirmation mode
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Copy the code
At this point restart the execution can see the effect of the “master of the trade”
Message persistence
Boolan durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false.false.null);
Copy the code
Setting persistence when declaring queues ensures that restart messages persist after MQ hangs
Publish subscribe mode publish_subscribe
- One producer, many consumers
- Each consumer has its own queue
- Instead of sending a message directly to a message queue, the producer sends it to a forwarder exchange
- Each queue is bound to the forwarder
- A message sent by a producer can be consumed by multiple consumers as it passes through the switch to the queue
producers
public final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// Get the connection and mq channel
Connection connection = ConnUtil.getConn();
Channel channel = connection.createChannel();
// Declare an Exchange switch
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// Message content
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
Copy the code
The test can see the result: a single consumption can be consumed by multiple consumers, only those who subscribe before the producer sends the message can consume the message, otherwise they can see the queue and switch binding in the client tool
Routing mode
Processing routing keys
Each queue is bound to a switch with a specified routingKey [routingKey]. At this time, messages sent by the switch containing these routingkeys can be sent to these queues, and consumers can successfully consume them.
The provider
public final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// Get the connection and mq channel
Connection connection = ConnUtil.getConn();
Channel channel = connection.createChannel();
/ / declare the exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");// Specify the switch type
// Message content
String message = "Hello World!";
String routingKey = "delete";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
Copy the code
Consumer 1
private final static String QUEUE_NAME = "queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
Connection connection = ConnUtil.getConn();
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// Bind queues to switches
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
// The server sends only one message to the consumer at a time
channel.basicQos(1);
// Define the consumers of the queue
QueueingConsumer consumer = new QueueingConsumer(channel);
// Listen on the queue and return done manually
channel.basicConsume(QUEUE_NAME, false, consumer);
// Get the message
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}Copy the code
Consumer 2
private final static String QUEUE_NAME = "queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
Connection connection = ConnUtil.getConn();
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// Bind queues to switches
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// The server sends only one message to the consumer at a time
channel.basicQos(1);
// Define the consumers of the queue
QueueingConsumer consumer = new QueueingConsumer(channel);
// Listen on the queue and return done manually
channel.basicConsume(QUEUE_NAME, false, consumer);
// Get the message
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}Copy the code
As you can see from the above code, consumer 2 is bound to the switch and the insert and DELETE routing keys are bound. Consumer 1 is only bound to insert. When you send an INSERT, both consumers can consume it
Topic mode topic
Matches a routing key to a pattern
‘#’ : matches one or more ‘*’ : matches the same message to be retrieved by multiple consumers. A consumer queue can have multiple consumer instances, and only one of them will consume the message.
channel.exchangeDeclare(EXCHANGE_NAME, "topic");/ / classification
String routingKey = "goods.update";
channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes()); This only matches the added function channel.queuebind (QUEUE_NAME, EXCHANGE_NAME,"goods.add"); Channel.queuebind (QUEUE_NAME, EXCHANGE_NAME,"goods.#");
Copy the code
How to ensure the reliability of messages
The transaction
- Channel.txselect () declares to start transaction mode;
- Channel.txcomment () commits the transaction;
- Channel.txrollback () rolls back the transaction;
The provider
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException {
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false.false.false.null);
String msg = "hello tx";
try {
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("Transaction Rollback");
} finally{ channel.close(); conn.close(); }}Copy the code
consumers
public static void main(String[] args) throws Exception {
Connection conn = ConnUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(TxSend.QUEUE_NAME, false.false.false.null);
// Define queue consumers
QueueingConsumer consumer = new QueueingConsumer(channel);
// Listen to the queue
channel.basicConsume(TxSend.QUEUE_NAME, true, consumer);
// Get the message
// Get the message
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(" [tx] Received '" + msg + "'"); }}Copy the code
After testing, it can be concluded that if there is no exception when sending the message, the message can be sent to the queue normally, and the consumer is normal. If an exception is caused by adding an int I = 10/0 when sending the message, then the transaction will be rolled back
This approach can ensure the reliability of the message, but also reduce throughput
Confirm pattern
The producer sets the channel to Confirm mode. Once the channel enters Confirm mode, all messages published in the channel are assigned a unique ID, which identifies the message. Once the message is sent to the matching queue, the broker sends an acknowledgement to the producer. Include the id of the message, which allows the producer to know if the message reached the destination queue, if the queue is persistent, confirm that the message is sent after being written to disk, and finally send to the producer to indicate that the message has been processed. The main benefits of the asynchronous confirm mode are as follows:
- In normal mode, the waitForConfirms() method is called once for every message sent
- Batch mode is to send messages in batches and call the waitForConfirms() method to determine whether a batch of messages have been processed
- The ConfirmListener() callback provided by the asynchronous Channel object only contains deliveryTag. We need to maintain an unconfirm set of message numbers for each Channel. Publish every data, increment the element in the collection by 1, call the handleAck method every time, delete multiple=false or multiple=true from the unconfirm collection. From the point of view of program running efficiency, the unconfirm set had better adopt the ordered set SortedSet storage structure.
General:
// The queue name
private static final String QUEUE_NAME = "test_simple_confirm";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnUtils.getConn();
// Get a channel from the connection
Channel channel = conn.createChannel();
// Create a queue declaration
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The producer calls confirmSelect() to set channel to confirm mode
channel.confirmSelect();
// The message sent
String msg = "hello test_simple_confirm";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// This can drastically reduce rabbitMQ throughput
System.out.println("send msg:" + msg);
if(! channel.waitForConfirms()) { System.out.println("send message failed");
} else {
System.out.println("send message ok");
}
/ / off the flow
Copy the code
Batch:
BasicPublish () multiple times is enough to send a batch of messages
String msg = "hello test_simple_confirm batch";
for (int i = 0; i < 10; i++) {// Send messages in batches
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
Copy the code
Asynchronous:
// The queue name
private static final String QUEUE_NAME = "test_simple_confirm3";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnUtils.getConn();
// Get a channel from the connection
Channel channel = conn.createChannel();
// Create a queue declaration
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The producer calls confirmSelect() to set channel to confirm mode
channel.confirmSelect();
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
// No problem
@Override
public void handleAck(long l, boolean b) throws IOException {
if (b) {
System.out.println("handleAck true");
confirmSet.headSet(l+1).clear();
} else {
System.out.println("handleAck false"); confirmSet.remove(l); }}// No problem
@Override
public void handleNack(long l, boolean b) throws IOException {
if (b) {
System.out.println("handleNack true");
confirmSet.headSet(l+1).clear();
} else {
System.out.println("handleNack false"); confirmSet.remove(l); }}});// The message sent
String msg = "hello test_simple_confirm batch async";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); confirmSet.add(seqNo); }}Copy the code