takeaway

With the basics of Rabbit in mind (see RabbitMQ in Depth and how it works and how to use RabbitMQ), we’ll focus on the exchanges in RabbitMQ.

Switch classification

RabbitMQ exchanges fall into four categories:

  • Direct (default)
  • headers
  • fanout
  • topic

The Headers exchange allows you to match the header of an AMQP message rather than the routing key, except that the Headers exchange is exactly the same as the Direct exchange, but is so poor that it is rarely needed, so we won’t cover it in this article.

Note: Fanout and Topic exchanges have no historical data, that is, for queues created in the middle of the process, no previous messages can be retrieved.

1. Direct switch

Direct is the default exchange type and is very simple. If the routing key matches, the message is sent to the corresponding queue, as shown in the figure below:

BasicPublish (“”, QueueName, null, message) pushes a Direct exchange message to the appropriate queue, using null characters as the default direct exchange and queue names as routing keys.

Direct exchange code example

The sender:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
Parameter Description: Parameter 1: queue name, parameter 2: persistent, parameter 3: exclusive mode, parameter 4: whether to delete the queue when the consumer disconnects, parameter 5: other parameters of the message
channel.queueDeclare(config.QueueName, false.false.false.null);
String message = String.format("Current time: %s".new Date().getTime());
Parameter description: Parameter 1: switch name; Parameter 2: queue name; Parameter 3: other attributes of the message - headers information of the route; Parameter 4: message body
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
Copy the code

Receiving end, receiving messages continuously:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
Parameter Description: Parameter 1: queue name, parameter 2: persistent, parameter 3: exclusive mode, parameter 4: whether to delete the queue when the consumer disconnects, parameter 5: other parameters of the message
channel.queueDeclare(config.QueueName, false.false.false.null);
Consumer defaultConsumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
			byte[] body) throws IOException {
		String message = new String(body, "utf-8"); // Message body
		System.out.println("Received message =>" + message);
		channel.basicAck(envelope.getDeliveryTag(), false); Parameter description: Parameter 1: index of the message; Parameter 2: whether to batch reply, true: batch confirm the message whose id is smaller than the current ID.}}; channel.basicConsume(config.QueueName,false."", defaultConsumer);
Copy the code

The receiving end gets a single message

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false.false.false.null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // Message confirmation
Copy the code

Continuous message capture uses: basic. Get a single message using: basic.get.

Note: You cannot use a single message consumption for a for loop as an alternative to continuous message consumption, as this is low performance;

The fair scheduling

When there are multiple subscribers on the receiving end, Direct polls and distributes fairly to each subscriber (subscriber message confirmation is normal), as shown in the figure below:

The post forget feature of a message

In this mode, the recipient does not know the source of the message. If you want to specify the sender of the message, you need to include it in the message, just like we mark our name in the letter, so that we can know who the sender is.

Message to confirm

As you can see from the above code, once the message is received, it must be acknowledged manually (in non-automatic acknowledgment deletion mode) using channel.basicack ().

What happens when a message is received unconfirmed?

If an application receives a message and the bug forgets to acknowledge it, the message’s state in the queue changes from “Ready” to “Unacked”.

If the message is received but not acknowledged, Rabbit will not send any more messages to the application because it believes you are not ready to receive the next message.

The message will remain Unacked until you confirm it, or disconnect from Rabbit, and Rabbit will automatically change the message to be Ready for distribution to other subscribers.

You can, of course, take advantage of this by having your application delay acknowledging the message until it has finished processing the business logic, effectively preventing Rabbit from sending you too many messages and causing your application to crash.

Message confirmation Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false.false.false.null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
Copy the code

Channel. basicAck(Long deliveryTag, Boolean multiple) specifies message confirmation, with parameter 1: message ID; Parameter 2: indicates whether to batch reply. True Indicates whether to batch acknowledge the messages whose id is smaller than the number of times.

Conclusion: Every message consumers consume must be confirmed.

Message to refuse

Before a message can be confirmed, it has two options:

Option 1: Disconnect from Rabbit, which redispatches the message to another consumer.

Option 2: Reject the message sent by Rabbit using channel.basicReject(Long deliveryTag, Boolean Requeue). Parameter 2: How messages are processed. If true, Rabbib redistributes the message to other subscribers. If false, Rabbit sends messages to a special “dead letter” queue for rejected messages that are not re-queued.

Message rejection Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false.false.false.null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); // Message rejected
Copy the code

2. Fanout switch — Publish/subscribe

Fanout is different from the Direct exchange. Fanout is a publish/subscribe exchange. When you send a message, the exchange broadcasts the message to all queues attached to the exchange.

Users upload their own face, for example, when the image needs to clear the cache, the user should get bonus points at the same time, you can put the two queue bound to the image upload switch on, so that when a third, and fourth finished uploading pictures to deal with demand, the original code can be the same, you just need to add a subscription, The sender and consumer code is completely decoupled and new functionality can be easily added.

Unlike the direct exchange, we add channel.exchangeDeclare(ExchangeName, “FANout “) when sending a message. This line of code declares the FANout exchange.

The sender:

final String ExchangeName = "fanoutec"; // Switch name
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // Declare the fanout switch
String message = "Time:" + new Date().getTime();
channel.basicPublish(ExchangeName, "".null, message.getBytes("UTF-8"));
Copy the code

Accepting messages is different from Direct in that we need to declare the Fanout router and bind it to the Fanout exchange using the default queue.

The receiver:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // Declare the fanout switch
String queueName = channel.queueDeclare().getQueue(); // Declare a queue
channel.queueBind(queueName, ExchangeName, "");
Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
			byte[] body) throws IOException {
		String message = new String(body, "UTF-8"); }}; channel.basicConsume(queueName,true, consumer);
Copy the code

The biggest difference between FANout and Direct is at the receiving end, where fanout needs to bind queues to the corresponding exchange for subscribing messages.

Where channel.queuedeclare ().getQueue() is a random queue. Rabbit generates a random queue name, which is automatically deleted once the consumer disconnects.

Note: The routingKey is not valid for fanout switches, and this parameter is ignored.

3. Topic switch — Match subscription patterns

Finally, topic switches run like fanout, but can be more flexible in matching the messages they want to subscribe to. This is where the routingKey comes in, using the routingKey for message (rule) matching.

Suppose we now have a logging system that sends logs of all log levels to the switch, warning, log, error, fatal, but we only want to process logs above error. What should we do? This is where the Topic router comes in.

The key to topic routers is to define routing keys. Define the routingKey name to be no longer than 255 bytes, using “. As a separator, for example, com.mq.rabbit.error.

A routingKey can match messages with the following characters when consuming them:

  • “*” matches a section (use “. Segmentation);
  • “#” matches 0 and multiple characters;

For example, a “com.mq.rabbit.error” message is issued:

Matching routing keys:

  • cn.mq.rabbit.*
  • cn.mq.rabbit.#
  • #.error
  • cn.mq.#
  • #

Unmatched routing key:

  • cn.mq.*
  • *.error
  • *

So if you want to subscribe to all messages, you can use a “#” match.

Note: Fanout and Topic exchanges have no historical data, that is, for queues created in the middle of the process, no previous messages can be retrieved.

Publish end:

String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // Declare the topic switch
String message = "Time:" + new Date().getTime();
channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));
Copy the code

The receiver:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // Declare the topic switch
String queueName = channel.queueDeclare().getQueue(); // Declare a queue
String routingKey = "#.error";
channel.queueBind(queueName, ExchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
			byte[] body) throws IOException {
		String message = new String(body, "UTF-8");
		System.out.println(routingKey + "| receiving message = >"+ message); }}; channel.basicConsume(queueName,true, consumer);
Copy the code

Extensions – Custom thread pools

If larger control connections are required, the user can set the thread pool as follows:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
Copy the code

Connectionfactory.newconnection has a thread pool by default. ConnectionFactory.

private ExecutorService sharedExecutor;
public Connection newConnection(a) throws IOException, TimeoutException {
		return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}
public void setSharedExecutor(ExecutorService executor) {
		this.sharedExecutor = executor;
}
Copy the code

Where this.sharedexecutor is the default thread pool, the ConnectionFactory thread pool can be set using setSharedExecutor() or null if not.

If a user sets up a thread pool himself, as described in the first section of this section, the user’s custom thread pool is not automatically closed when the connection is closed, so the user must manually shut it down by calling shutdown(), otherwise it may prevent the JVM from terminating.

The official advice is that you should only consider using this feature if your application has serious performance bottlenecks.

The project address

GitHub:github.com/vipstone/ra…