This is the 15th day of my participation in the August More text Challenge. For details, see: August More Text Challenge

Get started with RabbitMQ

  1. Create a new Maven project and import Maven dependencies (Java native dependencies) to rabbitMq.
<! -- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> < artifactId > closer - client < / artifactId > < version > 5.10.0 < / version > < / dependency >Copy the code
  1. For a producer to send a message to RabbitMQ and for a consumer to consume a message from a queue, it is necessary to connect to RabbitMQ, create a switch, and declare a queue.

    1. Create a connection factory

    2. Create a Connection

    3. Get a Channel through the connection

    4. Create switches, declare queues, bind relationships, route keys, send messages, and receive messages

    5. Prepare message content

    6. Send messages to the queue

    7. Close the Channel Channel

    8. Close the Connection

  • Get the connection, get the channel, and finally close the connection and channel are encapsulated in a utility class
public class RabbitMQUtil { public static Connection getChannel(String connectionName) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.180.130 connectionFactory. SetHost (" "); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); / / 2, creating a Connection Connection Connection Connection = connectionFactory. NewConnection (connectionName); return connection; } public static void close(Channel channel,Connection connection) throws IOException, TimeoutException {//7, close channel if(channel! =null&& channel.isOpen()){ channel.close(); } //8, close the connection if(connection! =null&& connection.isOpen()){ connection.close(); }}}Copy the code

producers

public class Producer { public static void main(String[] args) throws IOException, TimeoutException {// Get Connection Connection Connection = rabbitmqutil.getChannel (" producer "); Channel Channel = connection.createchannel (); String queueName = "queue3"; /** * @params1 Queue name * @params2 (durable) Whether to persist durable-false, that is, whether to save messages to a disk. The non-persistent queue will be saved to a disk. After the MQ server restarts, the non-persistent queue will be removed. @params3 (exclusive) @params4 (autoDelete) @params4 (autoDelete) @params4 (autoDelete) // /4.1 Declare queue channel.queueDeclare(queueName, true, false, queueName) false, null); String message = "No tea "; String message =" No tea "; /** * Publish(String Exchange, String routingKey, BasicProperties props, Byte [] body) * String Exchange: props * String routingKey: props: props Byte [] body params4: message body */ channel.basicPublish("", queueName, null, message.getBytes()); RabbitMQUtil.close(channel,connection); }}Copy the code

The two methods mentioned above:

  • queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map

    arguments) to declare a queue, where each parameter represents the following:
    ,>

    • String queueName of queue
    • boolean durableDurable or not, that is, whether messages are saved to disks. Durable =true: Messages are saved to disks. Durable =false: declares a nonpersistent queue. After the MQ server restarts, the nonpersistent queue is removed, and messages are lost.
    • boolean exclusiveExclusive or not, when EXCLUSIVE =true means that the queue is exclusive, then the queue is visible only to the connection for which it was first declared and is automatically deleted when the connection is disconnected.
    • boolean autoDeleteIf autodelete=true, the queue will be automatically deleted when at least one consumer is connected to the queue and then all consumers connected to the queue are disconnected.
    • Map<String, Object> argumentsSet some other parameters of the queue,
  • The basicPublish(String Exchange, String routingKey, BasicProperties props, byte[] body) method is used by producers to send messages. Each parameter represents the following:

    • String Name of an Exchange switch. In this case, we pass in an empty string that means to use the default switch AMQP default (the default switch type is Direct), When we create a queue, we will have a default routing key (binding key) with the same name as the new queue that is bound to the default exchange.

    • String routingKey indicates the routingKey. The second parameter we pass in is the queue name, which is actually the routing key. Since we declare an exchange with an empty string, we use the default switch. Therefore, the routing key we use to post the message is the name of the queue bound to the default switch.

    • BasicProperties props Extra setting for passing messages

    • Byte [] body Indicates the body of the message

consumers

public class Consumer { public static void main(String[] args) throws IOException, TimeoutException {Connection Connection = rabbitmqutil.getChannel (" consumer "); Channel channel = connection.createchannel (); Channel. BasicConsume ("queue3", true, new DeliverCallback() {// Consume("queue3", true, new DeliverCallback()); Public void handle(String s, Delivery message) throws IOException {system.out.println (" The received message is: " + new String(message.getBody(), "UTF-8")); }}, new CancelCallback(){// The callback method for canceling a subscription. public void handle(String s) throws IOException { System.out.println(""); }}); }}Copy the code

The methods mentioned above:

  • basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback (CancelCallback) is used by consumers to receive messages. In RabbitMQ, consumers can consume messages in push mode and pull mode. BasicConsume represents a push mode, where the server pushes the message to the consumer actively. The consumer processes the message in a callback method and then responds to the server. The basicGet method represents a pull pattern in which the consumer actively pulls a single message from the queue to consume each time and then responds to the server.

    • String Queue Queue name, specifying the queue from which messages are obtained

    • Boolean autoAck specifies whether to automatically reply to an ACK (MQ ACK is to confirm that a message has been consumed by a consumer and notify the server to clear the messages in the queue). In the case of manual confirmation, the consumer needs to manually send an ACK to the MQ server for each message consumed before continuing to consume the next message. If there is no acknowledgment, the message will be blocked. When the consumer is turned off, the consumed message will return to the queue. If auto=true, the consumer will reply to the server automatically after receiving the message. If a manual ACK is not performed at this point, messages will remain in the message queue even after the consumer is disconnected, resulting in message accumulation. Call the manual ACK method πŸ‘‰channel.basicAck(deliveryTag message sequence number,multiple whether to manually ACK multiple messages);

    • DeliverCallback DeliverCallback is a consumer callback function that processes incoming messages

    • CancelCallback CancelCallback Callback method when a consumer cancels a subscription.

  • After completing the above producer and consumer code, start the producer program first. When the producer successfully executes, we can see from the Web side that RabbitMQ creates a queue3 queue, and we successfully deliver the message to queue3 queue. You can also see from the figure below that the switch posted to our queue3 is the default switch AMQP default, as shown below

As you can see from the figure below, we bind queue3 to the default switch AMQP default when we do not declare a switch.

  • Then we start our consumer program and get the message from Queue3. When we get the message, we call the callback function and output the message we got. The program output looks like this:
Received message is: does not drink milk tea ProgrammerCopy the code

🚨🚨 Note: If the following error is encountered, the RabbitMQ server has not opened the corresponding port

Caused by: com.rabbitmq.client.ShutdownSignalException:connection error;
protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'admin', class-id=10, method-id=40)
Copy the code

Don’t panic πŸ˜‹! Just need to open the terminal port OK

systemctl status firewalld 
firewall-cmd --list-ports 
firewall-cmd --zone=public --add-port=5672/tcp --permanent 
firewall-cmd --zone=public --add-port=15672/tcp --permanent 
firewall-cmd --zone=public --add-port=15674-15675/tcp --permanent 
firewall-cmd --zone=public --add-port=1883/tcp --permanent 
firewall-cmd --reload
Copy the code

🏁 This is a detailed explanation of RabbitMQ. If there are any errors, please comment and correct them. If you found this article helpful, please like πŸ‘ πŸ˜‹πŸ˜»πŸ˜