RabbitMQ’s first program
The RabbitMQ – producers | consumers
Set up the environment
java client
Producers and consumers are both clients. Java clients for rabbitMQ are as follows
Creating a Maven project
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
Copy the code
Review of the AMQP protocol
Message model supported by RabbitMQ
First model (direct connection)
In the model above, there are the following concepts:
- P: Producer, that is, the program to send the message
- C: Consumer: the recipient of a message who waits for the message to arrive.
- Queue: Queue of messages, shown in red. Like a mailbox, messages can be cached; Producers send messages to them and consumers take messages out of them.
Development producer
/** * Producer * <p> * Direct connection mode **@author mxz
*/
@Component
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Get the channel in the connection
Channel channel = connection.createChannel();
// Channel binding message queue
// Parameter 1 Specifies the name of the queue. If it does not exist, it will be created automatically
// Parameter 2 defines whether the queue needs to be persisted. True will persist the queue (when mq is shut down, it will be saved to disk) false will not persist (when mq is shut down, it will be lost)
Parameter 3 exclusive Specifies whether the queue is exclusive. True Specifies whether the queue is exclusive. False Specifies whether the queue is not exclusive
Parameter 4 autoDelete Indicates whether the queue is automatically deleted after consumption. True Indicates automatically deleted. False Indicates not deleted
// Parameter 5 Additional additional parameter
channel.queueDeclare("hello".false.false.false.null);
// Publish the message
Parameter 1 Switch name
Parameter 2 Queue name
// Parameter 3 passes messages with additional Settings
// Parameter 4 Specifies the message content
channel.basicPublish(""."hello".null."hello rabbitMQ".getBytes()); RabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code
Tap consumers
/** * Consumer **@author mxz
*/
@Component
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Channel binding object
channel.queueDeclare("hello".false.false.false.null);
// Consume the message
// Parameter 1 message queue message, queue name
Parameter 2 Enables the message acknowledgement mechanism
// Parameter 3 message when the callback interface
channel.basicConsume("hello".true.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body)" + newString(body)); }});// channel.close();
// connection.close();}}Copy the code
Utility class
/ * * *@author mxz
*/
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
// Load the heavy resource class once.
static {
// Create a connection factory for MQ
connectionFactory = new ConnectionFactory();
// Set the RabbitMQ host
connectionFactory.setHost("127.0.0.1");
// Set the port number
connectionFactory.setPort(5672);
// Set which virtual host to connect to
connectionFactory.setVirtualHost("/codingce");
// Set the user name and password for accessing the virtual host
connectionFactory.setUsername("codingce");
connectionFactory.setPassword("123456");
}
/** * Define the method that provides the connection object **@return* /
public static Connection getConnection(a) {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/** * Close channel and close connection tool method **@param connection
* @param channel
*/
public static void closeConnectionAndChannel(Channel channel, Connection connection) {
try {
/ / close the channel first
if(channel ! =null)
channel.close();
if(connection ! =null)
connection.close();
} catch(Exception e) { e.printStackTrace(); }}}Copy the code
The second model (Work Quene)
Work queues, also known as Task queues, the Task model. When message processing is time-consuming, messages can be produced much faster than they can be consumed. In the long run, more and more messages can’t be processed in a timely manner. At this point, you can use the Work model: multiple consumers are bound to a queue to consume the messages in the queue. Once consumed, messages in the queue disappear, so tasks are not repeated.
Role:
- P: Producer: publisher of the task
- C1: consumer-1, who receives the task and completes it, assuming the completion speed is slow
- C2: Consumer-2: Take the task and complete it, assuming it is done quickly
Development producer
/** * producer * <p> *@author mxz
*/
@Component
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Declare the queue by channel
channel.queueDeclare("work".true.false.false.null);
for (int i = 0; i < 10; i++) {
// Production message
channel.basicPublish(""."work".null, ("" + i + "work quenue").getBytes());
}
// Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code
Develop consumer-1
/** * autoAck true 12 matching test * <p> * Consumer 1 **@author mxz
*/
@Component
public class CustomerOne {
public static void main(String[] args) throws IOException, TimeoutException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Channel binding object
channel.queueDeclare("work".true.false.false.null);
// Consume the message
// Parameter 1 message queue message, queue name
Parameter 2 Enables the message acknowledgement mechanism
// Parameter 3 message when the callback interface
channel.basicConsume("work".true.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
// The default allocation is equal
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1" + new String(body));
try {
Thread.sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}});// channel.close();
// connection.close();}}Copy the code
Develop consumer-2
/** * autoAck true 12 matching test * <p> * Consumer 2 **@author mxz
*/
@Component
public class CustomerTwo {
public static void main(String[] args) throws IOException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Channel binding object
channel.queueDeclare("work".true.false.false.null);
channel.basicConsume("work".true.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1" + newString(body)); }});// channel.close();
// connection.close();}}Copy the code
The test results
Summary: By default, RabbitMQ will send each message in order to the next user. On average, every consumer receives the same number of messages. This way of distributing messages is called a loop.
Automatic message confirmation mechanism
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
Consumers 3
/** * Can do more than 34 collocation test * <p> * consumer 3 **@author mxz
*/
@Component
public class CustomerThree {
public static void main(String[] args) throws IOException, TimeoutException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Only one message can be consumed at a time
channel.basicQos(1);
// Channel binding object
channel.queueDeclare("work".true.false.false.null);
// Parameter 1 Queue name parameter 2(autoAck) The message is automatically acknowledged true the consumer will automatically consume the message to rabbitMQ false Will not automatically acknowledge the message
// If there is a consumer outage, consumer 3 can make a purchase
channel.basicConsume("work".false.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
// The default allocation is equal
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1" + new String(body));
// Manually confirm parameter 1 in the confirm queue
channel.basicAck(envelope.getDeliveryTag(), false);
try {
Thread.sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}});// channel.close();
// connection.close();}}Copy the code
Consumers 4
/** * Can do more than 34 matching test * <p> * consumer 4 **@author mxz
*/
@Component
public class CustomerFour {
public static void main(String[] args) throws IOException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Only one message can be consumed at a time
channel.basicQos(1);
// Channel binding object
channel.queueDeclare("work".true.false.false.null);
channel.basicConsume("work".false.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1" + new String(body));
// Confirm parameters manually
channel.basicAck(envelope.getDeliveryTag(), false); }});// channel.close();
// connection.close();}}Copy the code
Consumers 3
/** * Can do more than 34 collocation test * <p> * consumer 3 **@author mxz
*/
@Component
public class CustomerThree {
public static void main(String[] args) throws IOException, TimeoutException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Only one message can be consumed at a time
channel.basicQos(1);
// Channel binding object
channel.queueDeclare("work".true.false.false.null);
// Parameter 1 Queue name parameter 2(autoAck) The message is automatically acknowledged true the consumer will automatically consume the message to rabbitMQ false Will not automatically acknowledge the message
// If there is a consumer outage, consumer 3 can make a purchase
channel.basicConsume("work".false.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
// The default allocation is equal
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1" + new String(body));
// Manually confirm parameter 1 in the confirm queue
channel.basicAck(envelope.getDeliveryTag(), false);
try {
Thread.sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}});// channel.close();
// connection.close();}}Copy the code
Consumers 4
/** * Can do more than 34 matching test * <p> * consumer 4 **@author mxz
*/
@Component
public class CustomerFour {
public static void main(String[] args) throws IOException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Only one message can be consumed at a time
channel.basicQos(1);
// Channel binding object
channel.queueDeclare("work".true.false.false.null);
channel.basicConsume("work".false.new DefaultConsumer(channel) {
// Last parameter message queue retrieved message
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1" + new String(body));
// Confirm parameters manually
channel.basicAck(envelope.getDeliveryTag(), false); }});// channel.close();
// connection.close();}}Copy the code
The third model (Fanout)
Fanout is also called broadcast
In broadcast mode, the message sending process looks like this:
- You can have multiple consumers
- Each consumer has its own queue
- Each queue is bound to an Exchange (switch)
- The message sent by the producer can only be sent to the switch. The switch decides which queue to send, not the producer.
- The switch sends messages to all queues that are bound
- All consumers in the queue can get the message. Implement a message consumed by multiple consumers
Development development producer
/** * Producer * <p> * Task model fanout **@author mxz
*/
@Component
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Declare the channel. Specify switch parameter 1 switch name parameter 2 for switch type fanOUT broadcast type
channel.exchangeDeclare("logs"."fanout");
// Send the message
channel.basicPublish("logs"."".null."fanout type message".getBytes());
// Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code
Tap consumers
- Consumer 1
/** * consumer 1 * <p> * Task model fanout **@author mxz
*/
public class CustomerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Bind channels to switches
channel.exchangeDeclare("logs"."fanout");
// Temporary queue
String queue = channel.queueDeclare().getQueue();
// Bind the switch queue
channel.queueBind(queue, "logs"."");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 1" + newString(body)); }}); }}Copy the code
- Consumer 2
/** * consumer 2 * <p> * Task model fanout **@author mxz
*/
public class CustomerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Bind channels to switches
channel.exchangeDeclare("logs"."fanout");
// Temporary queue
String queue = channel.queueDeclare().getQueue();
// Bind the switch queue
channel.queueBind(queue, "logs"."");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 2" + newString(body)); }}); }}Copy the code
- Consumers 3
/** * consumer 3 * <p> * Task model fanout **@author mxz
*/
public class CustomerThree {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Bind channels to switches
channel.exchangeDeclare("logs"."fanout");
// Temporary queue
String queue = channel.queueDeclare().getQueue();
// Bind the switch queue
channel.queueBind(queue, "logs"."");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 3" + newString(body)); }}); }}Copy the code
The test results
Fourth model (Routing)
Routing subscription Model -Direct
In Fanout mode, a message is consumed by all subscribed queues. However, in some scenarios, we want different messages to be consumed by different queues. This is where Direct Exchange is used.
Under the Direct model:
- The binding between the queue and the switch cannot be arbitrary, but one must be specified
RoutingKey
(Route key) - The sender of the message must also specify the message when sending the message to Exchange
RoutingKey
. - Exchange no longer delivers messages to each bound queue, but rather according to the message
Routing Key
To make judgments, only queueRoutingkey
With the message ofRouting key
Is exactly the same, the message will be received
Process:
Illustration:
- P: the producer that sends a message to Exchange. When sending a message, it specifies a routing key.
- X: Exchange, which receives a message from the producer and delivers it to a queue that matches the routing key exactly
- C1: consumer whose queue has a message with a routing key of ERROR
- C2: consumer whose queue has messages with routing keys as INFO, Error, and Warning
Development producer
/ * * *@author mxz
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
Declare the switch through the channel. Parameter 1 Switch name parameter 2 Routing mode
channel.exchangeDeclare("logs_direct"."direct");
// Send the message
String routingKey = "error";
channel.basicPublish("logs_direct", routingKey, null, ("This is direct mode publishing based on route_key [" + routingKey + "]").getBytes());
// Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code
Tap consumers
- Consumer 1
/** * Consumer 1 **@author mxz
*/
@Component
public class CustomerOne {
public static void main(String[] args) throws IOException, TimeoutException {
// Get the connection object
Connection connection = RabbitMQUtils.getConnection();
// Create a channel
Channel channel = connection.createChannel();
// Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Bind a queue switch based on route_key
channel.queueBind(queue, "logs_direct"."error");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 1:" + newString(body)); }});// channel.close();
// connection.close();}}Copy the code
- Consumer 2
/** * Consumer 2 **@author mxz
*/
@Component
public class CustomerTwo {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Declare the switch
channel.exchangeDeclare("logs_direct"."direct");
// Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Temporary queue and bound switch
channel.queueBind(queue, "logs_direct"."info");
channel.queueBind(queue, "logs_direct"."error");
channel.queueBind(queue, "logs_direct"."warning");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code
Routing subscription model -Topic
Compared to Direct, Topic Exchange can route messages to different queues based on RoutingKey. But Topic Exchange allows queues to bind to Routing keys using wildcards! The model Routingkey generally consists of one or more words, with “.” Split, for example: item.insert
# wild-cards
*(star) can substitute for exactly one word. Match exactly 1 word no more, no less# (hash) can substitute for zero or more words. # matches audit such as audit.irs.corporate or audit.irs. * matches only audit.irsCopy the code
Development producer
/** * Producer * <p> **@author mxz
*/
@Component
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Declare the switch and switch type
channel.exchangeDeclare("topics"."topic");
/ / routing key
String routeKey = "user.save";
channel.basicPublish("topics", routeKey, null, ("Here is the topic dynamic routing model, routeKey:[" + routeKey + "]").getBytes());
// Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code
Tap consumers
- consumers
/ * * *@author mxz
*/
public class CustomerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Declare the switch and switch type
channel.exchangeDeclare("topics"."topic");
// Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Bind the queue to the switch dynamic wildcard route key
channel.queueBind(queue, "topics"."user.*");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 1:" + newString(body)); }}); }}Copy the code
- consumers
/ * * *@author mxz
*/
public class CustomerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// Declare the switch and switch type
channel.exchangeDeclare("topics"."topic");
// Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Bind the queue to the switch dynamic wildcard route key
channel.queueBind(queue, "topics"."user.#");
// Consume the message
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code
This article has been uploaded to gitee gitee.com/codingce/he… Project address: github.com/xzMhehe/cod…
This article has been uploaded to gitee gitee.com/codingce/he… Project address: github: github.com/xzMhehe/cod…