RabbitMQ’s first program

The RabbitMQ – producers | consumers

Set up the environment

java client

Producers and consumers are both clients. Java clients for rabbitMQ are as follows

Creating a Maven project

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.10.0</version>
</dependency>
Copy the code

Review of the AMQP protocol

Message model supported by RabbitMQ

First model (direct connection)

In the model above, there are the following concepts:

  • P: Producer, that is, the program to send the message
  • C: Consumer: the recipient of a message who waits for the message to arrive.
  • Queue: Queue of messages, shown in red. Like a mailbox, messages can be cached; Producers send messages to them and consumers take messages out of them.

Development producer

/** * Producer * <p> * Direct connection mode **@author mxz
 */
@Component
public class Provider {

    public static void main(String[] args) throws IOException, TimeoutException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();
        // Get the channel in the connection
        Channel channel = connection.createChannel();

        // Channel binding message queue
        // Parameter 1 Specifies the name of the queue. If it does not exist, it will be created automatically
        // Parameter 2 defines whether the queue needs to be persisted. True will persist the queue (when mq is shut down, it will be saved to disk) false will not persist (when mq is shut down, it will be lost)
        Parameter 3 exclusive Specifies whether the queue is exclusive. True Specifies whether the queue is exclusive. False Specifies whether the queue is not exclusive
        Parameter 4 autoDelete Indicates whether the queue is automatically deleted after consumption. True Indicates automatically deleted. False Indicates not deleted
        // Parameter 5 Additional additional parameter
        channel.queueDeclare("hello".false.false.false.null);

        // Publish the message

        Parameter 1 Switch name
        Parameter 2 Queue name
        // Parameter 3 passes messages with additional Settings
        // Parameter 4 Specifies the message content
        channel.basicPublish(""."hello".null."hello rabbitMQ".getBytes()); RabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code

Tap consumers

/** * Consumer **@author mxz
 */
@Component
public class Customer {
    public static void main(String[] args) throws IOException, TimeoutException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Channel binding object
        channel.queueDeclare("hello".false.false.false.null);

        // Consume the message
        // Parameter 1 message queue message, queue name
        Parameter 2 Enables the message acknowledgement mechanism
        // Parameter 3 message when the callback interface
        channel.basicConsume("hello".true.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body)" + newString(body)); }});// channel.close();
// connection.close();}}Copy the code

Utility class

/ * * *@author mxz
 */
public class RabbitMQUtils {

    private static ConnectionFactory connectionFactory;

    // Load the heavy resource class once.
    static {
        // Create a connection factory for MQ
        connectionFactory = new ConnectionFactory();
        // Set the RabbitMQ host
        connectionFactory.setHost("127.0.0.1");
        // Set the port number
        connectionFactory.setPort(5672);
        // Set which virtual host to connect to
        connectionFactory.setVirtualHost("/codingce");
        // Set the user name and password for accessing the virtual host
        connectionFactory.setUsername("codingce");
        connectionFactory.setPassword("123456");
    }

    /** * Define the method that provides the connection object **@return* /
    public static Connection getConnection(a) {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }


    /** * Close channel and close connection tool method **@param connection
     * @param channel
     */
    public static void closeConnectionAndChannel(Channel channel, Connection connection) {
        try {
            / / close the channel first
            if(channel ! =null)
                channel.close();
            if(connection ! =null)
                connection.close();
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

The second model (Work Quene)

Work queues, also known as Task queues, the Task model. When message processing is time-consuming, messages can be produced much faster than they can be consumed. In the long run, more and more messages can’t be processed in a timely manner. At this point, you can use the Work model: multiple consumers are bound to a queue to consume the messages in the queue. Once consumed, messages in the queue disappear, so tasks are not repeated.

Role:

  • P: Producer: publisher of the task
  • C1: consumer-1, who receives the task and completes it, assuming the completion speed is slow
  • C2: Consumer-2: Take the task and complete it, assuming it is done quickly

Development producer

/** * producer * <p> *@author mxz
 */
@Component
public class Provider {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // Declare the queue by channel
        channel.queueDeclare("work".true.false.false.null);

        for (int i = 0; i < 10; i++) {
            // Production message
            channel.basicPublish(""."work".null, ("" + i + "work quenue").getBytes());
        }

        // Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code

Develop consumer-1

/** * autoAck true 12 matching test * <p> * Consumer 1 **@author mxz
 */
@Component
public class CustomerOne {
    public static void main(String[] args) throws IOException, TimeoutException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Channel binding object
        channel.queueDeclare("work".true.false.false.null);

        // Consume the message
        // Parameter 1 message queue message, queue name
        Parameter 2 Enables the message acknowledgement mechanism
        // Parameter 3 message when the callback interface
        channel.basicConsume("work".true.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            // The default allocation is equal
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-1" + new String(body));
                try {
                    Thread.sleep(1000);
                } catch(InterruptedException e) { e.printStackTrace(); }}});// channel.close();
// connection.close();}}Copy the code

Develop consumer-2

/** * autoAck true 12 matching test * <p> * Consumer 2 **@author mxz
 */
@Component
public class CustomerTwo {
    public static void main(String[] args) throws IOException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Channel binding object
        channel.queueDeclare("work".true.false.false.null);

        channel.basicConsume("work".true.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-1" + newString(body)); }});// channel.close();
// connection.close();}}Copy the code

The test results

Summary: By default, RabbitMQ will send each message in order to the next user. On average, every consumer receives the same number of messages. This way of distributing messages is called a loop.

Automatic message confirmation mechanism

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

Consumers 3

/** * Can do more than 34 collocation test * <p> * consumer 3 **@author mxz
 */
@Component
public class CustomerThree {
    public static void main(String[] args) throws IOException, TimeoutException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Only one message can be consumed at a time
        channel.basicQos(1);
        // Channel binding object
        channel.queueDeclare("work".true.false.false.null);

        // Parameter 1 Queue name parameter 2(autoAck) The message is automatically acknowledged true the consumer will automatically consume the message to rabbitMQ false Will not automatically acknowledge the message
        // If there is a consumer outage, consumer 3 can make a purchase
        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            // The default allocation is equal
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-1" + new String(body));
                // Manually confirm parameter 1 in the confirm queue
                channel.basicAck(envelope.getDeliveryTag(), false);
                try {
                    Thread.sleep(1000);
                } catch(InterruptedException e) { e.printStackTrace(); }}});// channel.close();
// connection.close();}}Copy the code

Consumers 4

/** * Can do more than 34 matching test * <p> * consumer 4 **@author mxz
 */
@Component
public class CustomerFour {
    public static void main(String[] args) throws IOException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Only one message can be consumed at a time
        channel.basicQos(1);

        // Channel binding object
        channel.queueDeclare("work".true.false.false.null);

        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-1" + new String(body));

                // Confirm parameters manually
                channel.basicAck(envelope.getDeliveryTag(), false); }});// channel.close();
// connection.close();}}Copy the code

Consumers 3

/** * Can do more than 34 collocation test * <p> * consumer 3 **@author mxz
 */
@Component
public class CustomerThree {
    public static void main(String[] args) throws IOException, TimeoutException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Only one message can be consumed at a time
        channel.basicQos(1);
        // Channel binding object
        channel.queueDeclare("work".true.false.false.null);

        // Parameter 1 Queue name parameter 2(autoAck) The message is automatically acknowledged true the consumer will automatically consume the message to rabbitMQ false Will not automatically acknowledge the message
        // If there is a consumer outage, consumer 3 can make a purchase
        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            // The default allocation is equal
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-1" + new String(body));
                // Manually confirm parameter 1 in the confirm queue
                channel.basicAck(envelope.getDeliveryTag(), false);
                try {
                    Thread.sleep(1000);
                } catch(InterruptedException e) { e.printStackTrace(); }}});// channel.close();
// connection.close();}}Copy the code

Consumers 4

/** * Can do more than 34 matching test * <p> * consumer 4 **@author mxz
 */
@Component
public class CustomerFour {
    public static void main(String[] args) throws IOException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Only one message can be consumed at a time
        channel.basicQos(1);

        // Channel binding object
        channel.queueDeclare("work".true.false.false.null);

        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            // Last parameter message queue retrieved message
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-1" + new String(body));

                // Confirm parameters manually
                channel.basicAck(envelope.getDeliveryTag(), false); }});// channel.close();
// connection.close();}}Copy the code

The third model (Fanout)

Fanout is also called broadcast

In broadcast mode, the message sending process looks like this:

  • You can have multiple consumers
  • Each consumer has its own queue
  • Each queue is bound to an Exchange (switch)
  • The message sent by the producer can only be sent to the switch. The switch decides which queue to send, not the producer.
  • The switch sends messages to all queues that are bound
  • All consumers in the queue can get the message. Implement a message consumed by multiple consumers

Development development producer

/** * Producer * <p> * Task model fanout **@author mxz
 */
@Component
public class Provider {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();


        // Declare the channel. Specify switch parameter 1 switch name parameter 2 for switch type fanOUT broadcast type
        channel.exchangeDeclare("logs"."fanout");

        // Send the message
        channel.basicPublish("logs"."".null."fanout type message".getBytes());

        // Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code

Tap consumers

  • Consumer 1
/** * consumer 1 * <p> * Task model fanout **@author mxz
 */
public class CustomerOne {

    public static void main(String[] args) throws IOException {

        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        // Bind channels to switches
        channel.exchangeDeclare("logs"."fanout");

        // Temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Bind the switch queue
        channel.queueBind(queue, "logs"."");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1" + newString(body)); }}); }}Copy the code
  • Consumer 2
/** * consumer 2 * <p> * Task model fanout **@author mxz
 */
public class CustomerTwo {

    public static void main(String[] args) throws IOException {

        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        // Bind channels to switches
        channel.exchangeDeclare("logs"."fanout");

        // Temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Bind the switch queue
        channel.queueBind(queue, "logs"."");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2" + newString(body)); }}); }}Copy the code
  • Consumers 3
/** * consumer 3 * <p> * Task model fanout **@author mxz
 */
public class CustomerThree {

    public static void main(String[] args) throws IOException {

        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        // Bind channels to switches
        channel.exchangeDeclare("logs"."fanout");

        // Temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Bind the switch queue
        channel.queueBind(queue, "logs"."");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 3" + newString(body)); }}); }}Copy the code

The test results

Fourth model (Routing)

Routing subscription Model -Direct

In Fanout mode, a message is consumed by all subscribed queues. However, in some scenarios, we want different messages to be consumed by different queues. This is where Direct Exchange is used.

Under the Direct model:

  • The binding between the queue and the switch cannot be arbitrary, but one must be specifiedRoutingKey(Route key)
  • The sender of the message must also specify the message when sending the message to ExchangeRoutingKey.
  • Exchange no longer delivers messages to each bound queue, but rather according to the messageRouting KeyTo make judgments, only queueRoutingkeyWith the message ofRouting keyIs exactly the same, the message will be received

Process:

Illustration:

  • P: the producer that sends a message to Exchange. When sending a message, it specifies a routing key.
  • X: Exchange, which receives a message from the producer and delivers it to a queue that matches the routing key exactly
  • C1: consumer whose queue has a message with a routing key of ERROR
  • C2: consumer whose queue has messages with routing keys as INFO, Error, and Warning

Development producer

/ * * *@author mxz
 */
public class Provider {
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        Declare the switch through the channel. Parameter 1 Switch name parameter 2 Routing mode
        channel.exchangeDeclare("logs_direct"."direct");

        // Send the message
        String routingKey = "error";

        channel.basicPublish("logs_direct", routingKey, null, ("This is direct mode publishing based on route_key [" + routingKey + "]").getBytes());

        // Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code

Tap consumers

  • Consumer 1
/** * Consumer 1 **@author mxz
 */
@Component
public class CustomerOne {
    public static void main(String[] args) throws IOException, TimeoutException {

        // Get the connection object
        Connection connection = RabbitMQUtils.getConnection();

        // Create a channel
        Channel channel = connection.createChannel();

        // Create a temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Bind a queue switch based on route_key
        channel.queueBind(queue, "logs_direct"."error");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1:" + newString(body)); }});// channel.close();
// connection.close();}}Copy the code
  • Consumer 2
/** * Consumer 2 **@author mxz
 */
@Component
public class CustomerTwo {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        // Declare the switch
        channel.exchangeDeclare("logs_direct"."direct");

        // Create a temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Temporary queue and bound switch
        channel.queueBind(queue, "logs_direct"."info");
        channel.queueBind(queue, "logs_direct"."error");
        channel.queueBind(queue, "logs_direct"."warning");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code

Routing subscription model -Topic

Compared to Direct, Topic Exchange can route messages to different queues based on RoutingKey. But Topic Exchange allows queues to bind to Routing keys using wildcards! The model Routingkey generally consists of one or more words, with “.” Split, for example: item.insert

# wild-cards
		*(star) can substitute for exactly one word. Match exactly 1 word no more, no less# (hash) can substitute for zero or more words. # matches audit such as audit.irs.corporate or audit.irs. * matches only audit.irsCopy the code

Development producer

/** * Producer * <p> **@author mxz
 */
@Component
public class Provider {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();


        // Declare the switch and switch type
        channel.exchangeDeclare("topics"."topic");


        / / routing key
        String routeKey = "user.save";

        channel.basicPublish("topics", routeKey, null, ("Here is the topic dynamic routing model, routeKey:[" + routeKey + "]").getBytes());

        // Close the resourceRabbitMQUtils.closeConnectionAndChannel(channel, connection); }}Copy the code

Tap consumers

  • consumers
/ * * *@author mxz
 */
public class CustomerOne {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        // Declare the switch and switch type
        channel.exchangeDeclare("topics"."topic");

        // Create a temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Bind the queue to the switch dynamic wildcard route key
        channel.queueBind(queue, "topics"."user.*");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1:" + newString(body)); }}); }}Copy the code
  • consumers
/ * * *@author mxz
 */
public class CustomerTwo {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        // Declare the switch and switch type
        channel.exchangeDeclare("topics"."topic");

        // Create a temporary queue
        String queue = channel.queueDeclare().getQueue();

        // Bind the queue to the switch dynamic wildcard route key
        channel.queueBind(queue, "topics"."user.#");

        // Consume the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code

This article has been uploaded to gitee gitee.com/codingce/he… Project address: github.com/xzMhehe/cod…

This article has been uploaded to gitee gitee.com/codingce/he… Project address: github: github.com/xzMhehe/cod…