6 RabbitMQ working modes
Connect to RabbitMQ as a Java application and consider integrating with Spring Boot later.
Producer and consumer RabbitMQ may not be hosted on the same host. An application can be both producer and consumer.
Switch type
Direct: Matches the queue based on the routing key and sends messages. Topic: Matches a queue based on the routing key, either full or fuzzy. Headers: fanout: message broadcast.
Pom file
< the dependency > < groupId > com. The rabbitmq < / groupId > < artifactId > closer - client < / artifactId > < version > 5.12.0 < / version > </dependency>Copy the code
1.1. Work Queues
The spending patterns of competitors.
The flow chart
The switch is empty, but RabbitMQ has a default switch and will be sent to the default switch if it is not set.
The characteristics of
If there are multiple consumers, MQ will send messages in a polling manner, and messages can only be consumed once.
Code demo
producers
public class Producer { public static final String TEST_QUEUE = "test_queue"; Public static void main(String[] args) throws Exception {// Create a connection factor. ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set the connection attributes connectionFactory. SetHost (" localhost "); connectionFactory.setUsername("XX"); connectionFactory.setPassword("CC$"); try (Connection connection = connectionFactory.newConnection(); Channel Channel = connection.createchannel ()) {// declare queue channel.queueDeclare(TEST_QUEUE, true, false, false, null); String message = "Hello World!" ; / / send a message channel. BasicPublish (" ", TEST_QUEUE, MessageProperties PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); }}}Copy the code
Explain the parameters of some of the above methods:
[Queue name] [Persistent or not, if true, the message queue will still exist after MQ restart.] If a queue is declared as exclusive, it is only visible to the connection that first declared it, and is automatically deleted when the connection is disconnected. There are three points to note here: 1. Exclusive queue is based on the connection is visible, the same connection of different channels can be simultaneously access the same connection created exclusive queue; 2. The "first", if a connection has been declared a exclusive queue, other connections are not allowed to establish the exclusive queue with the same name, this is different from ordinary queue; 3. Even if the queue is persistent, The exclusive queue is automatically deleted once the connection is closed or the client exits. This queue is suitable for application scenarios where a client sends read messages. Automatic deletion, if the queue does not have any subscribed consumers, the queue will be automatically deleted. This queue is applicable to temporary queues. Queue.DeclareOk queueDeclare(String Queue, Boolean durable, Boolean EXCLUSIVE, Boolean autoDelete,Map<String, Object> arguments);Copy the code
When the mandatory bit is set to true, if an Exchange cannot find a queue that matches the queue type and message routeKey, Then the basic.return method is called to return the message to the producer (basic.return + content-header + content-body); when mandatory is set to false, In this case, the broker simply throws the message away. The default is false. [With the immediate flag set to true, if exchange routes messages to queue(s) and finds that there are no consumers on queue(s), If all queues (one or more) associated with the routeKey of the message have no consumers, Void basicPublish(String Exchange, String routingKey, Boolean Mandatory, String Exchange, String routingKey, Boolean Mandatory, boolean immediate, BasicProperties props, byte[] body);Copy the code
Set message persistence, with deliveryMode = 1 for non-persistent and deliveryMode = 2 for persistent. Messages will not be lost after the MQ restart. Public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",null,null,2,0, null,null, null,null, null, null, null,null, null);Copy the code
consumers
public class Consumer { public static final String TEST_QUEUE = "test_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("XXX"); factory.setPassword("XXX$"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); QueueDeclare (TEST_QUEUE, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Channel. basicQos(1); // The number of messages the consumer can receive. DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); BasicAck (delivery.getenvelope ().getDeliveryTag(), false); }}; channel.basicConsume(TEST_QUEUE, false, deliverCallback, consumerTag -> {}); }}Copy the code
Ack mechanism:
RabbitMQ breaks up with ack and automatic ACK. If automatic ACK is set, MQ deletes the message immediately after sending it. If the consumer consumes the message abnormally, the message will not be resended because it has already been deleted. The unprocessed messages received by the consumer are lost. A manual ACK waits for the ACK to confirm before deleting the message. If a channel is closed, or a Connection is closed, or a TCP Connection is lost, the message is requeued.
Method Parameter action:
[Message content limit, in bytes] [Limit on the number of messages received from MQ. Suppose there are two consumers, one defines that the current parameter is not set to A, and the other sets 2 to B. When the producer sends 100 messages to the consumer, B polls to stop receiving the messages after receiving two messages. Void basicQos(int prefetchSize, int prefetchCount, int prefetchCount) boolean global);Copy the code
BasicQos cooperate autoAck = falsechannel. BasicConsume (TEST_QUEUE, false, deliverCallback, consumerTag – > {}); Use. If autoAck is not set to false, then the basicQos setting is invalid because MQ does not check the number of unconfirmed messages from consumers, it just sends messages to consumers continuously.
1.2 Publish/Subscribe
The core of RabbitMQ messaging is that the producer never sends a message directly to the queue, in fact the producer does not know whether the message will be delivered to another queue at all. The producer can only send messages to the switch.
Send messages to multiple consumers at once.
1.2.1 Flow chart
1.2.2 characteristics,
Send messages to multiple consumers
If there is no queue binding on the switch, messages will be lost after they are sent.
The FanOut switch will ignore the value of the routingKey.
1.2.3 Code demonstration
public class Producer { private static final String EXCHANGE_NAME = "log"; Public static void main(String[] args) throws Exception {// Create a connection factor. ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set the connection attributes connectionFactory. SetHost (" localhost "); connectionFactory.setUsername("XX"); connectionFactory.setPassword("CC$"); try (Connection connection = connectionFactory.newConnection(); Channel Channel = connection.createchannel ()) {// Declare a switch channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "send message 3"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } public class Consumer { private static final String EXCHANGE_NAME = "log"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("cc"); factory.setPassword("cc$"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Ensure that the exchange has channel.ExchangeDeclare (EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); String queueName = channel.queueDeclare(). QueueBind (queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code
We started the Producer first and then started the Consumer, and found that the Consumer did not receive the message, indicating that the message was lost when there was no queue listening.
If we amend the queue into the channel. QueueDeclare (QUEUE_NAME, false, false, false, null); And then start two identical consumers to discover that the messages are competitive because the messages are polled to different consumers.
1.3 Routing Mode
1.3.1 Flow chart
1.3.2, characteristics
Send a message to a queue based on a routing key match.
A routing key can be bound to multiple queues, and a queue can also be bound to multiple routing keys.
1.3.3 Code demonstration
public class Producer { private static final String EXCHANGE_NAME = "direct_log"; Public static void main(String[] args) throws Exception {// Create a connection factor. ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set the connection attributes connectionFactory. SetHost (" localhost "); connectionFactory.setUsername("cc"); connectionFactory.setPassword("cc$"); try (Connection connection = connectionFactory.newConnection(); Channel Channel = connection.createchannel ()) {// Declare a switch channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String message = "send message 4-error"; channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } public class Consumer { private static final String EXCHANGE_NAME = "direct_log"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("cc"); factory.setPassword("cc$"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); / / make sure that the switch is channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. DIRECT); String queueName = channel.queueDeclare().getQueue(); String queueName = channel.queueDeclare(). QueueBind (queueName, EXCHANGE_NAME, "error"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code
Change the routing key bound to the consumer’s queue, start different clients, then change the producer’s routing key and send multiple messages to see that the producer is sent to the queue that matches the routing key. If there is no binding queue on the switch, the message will be lost.
1.4 wildcard mode (Topics)
1.4.1 Flow chart
1.4.2, characteristics
* can match one word, # matches multiple words, routing keys to “.” For example, the routing key of test.log can be queued to *.log, and the routing key of test.log.user can be queued to test.
If * and # are left blank, it is the same as the direct exchange.
Note that the consumer’s routing key is bound to a fuzzy matching key.
1.4.3 Code demonstration
public class Producer { private static final String EXCHANGE_NAME = "topic_log"; Public static void main(String[] args) throws Exception {// Create a connection factor. ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / set the connection attributes connectionFactory. SetHost (" localhost "); connectionFactory.setUsername("rr"); connectionFactory.setPassword("rr$"); try (Connection connection = connectionFactory.newConnection(); Channel Channel = connection.createchannel ()) {// Declare a switch channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String message = "send message 5-error"; channel.basicPublish(EXCHANGE_NAME, "test.error.log", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } public class Consumer { private static final String EXCHANGE_NAME = "topic_log"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("ff"); factory.setPassword("ff$"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); / / make sure that the switch is channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType. TOPIC); String queueName = channel.queueDeclare().getQueue(); String queueName = channel.queueDeclare(). QueueBind (queueName, EXCHANGE_NAME, "test.*.log"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code
The difference from direct is that only the switch type is changed to Topic, and the consumer’s routing key has a fuzzy matching rule.
2. Pattern summary
Several working modes correspond more or less to several switches, and if there are more than one consumer in a queue, there will be a competitive consumption situation. The matching functions of switches are broadcast, full match routing keys, and fuzzy match routing keys.