In this chapter we'll focus on the exchange in Rabbit. RabbitMQ exchanges are classified into four types:Copy the code

Direct (default) Headers Fanout topic Where headers switches allow you to match the headers of AMQP messages instead of routing keys. Other than that, headers switches are exactly the same as direct, but perform poorly and are rarely used. So we won’t do it in this article.

Note: Fanout and Topic exchanges have no historical data, that is, for queues created in the middle of the process, no previous messages can be retrieved. Direct is the default exchange type. It is also very simple. If the routing key matches, the message is sent to the corresponding queue, as shown in the figure below:Copy the code

Picture description (50 words Max)

BasicPublish ("", QueueName, null, message) pushes a Direct exchange message to the appropriate queue, using null characters as the default direct exchange and queue names as routing keys. Direct switch code example sender:Copy the code

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); Parameter description: Parameter 1: queue name, parameter 2: Persistent or not, Parameter 3: Exclusive mode or not, parameter 4: whether to delete the queue when the consumer disconnects, parameter 5: Channel. queueDeclare(config.QueueName, false, false, false, null); String message = string.format (” current time: %s”, new Date().getTime()); Parameter description: Parameter 1: switch name; Parameter 2: queue name; Parameter 3: other attributes of the message – headers information of the route; Parameter 4: BasicPublish (“”, config.QueueName, null, message.getBytes(“UTF-8”)); Receiving end, receiving messages continuously:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); Parameter description: Parameter 1: queue name, parameter 2: Persistent or not, Parameter 3: Exclusive mode or not, parameter 4: whether to delete the queue when the consumer disconnects, parameter 5: Channel. queueDeclare(config.QueueName, false, false, false, null); Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, “utf-8”); // Message body system.out. println(workName + “Received message =>” + message); channel.basicAck(envelope.getDeliveryTag(), false); [Parameter description: Parameter 1: index of the message; Parameter 2: whether to batch reply, true: batch confirm the message whose id is smaller than the current ID]}}; channel.basicConsume(config.QueueName, false, “”, defaultConsumer); The receiving end gets a single message

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), “UTF-8”); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // Message acknowledgement continuous message capture uses: basic. Consume; Get a single message using: basic.get.

Note: You cannot use a single message consumption for a for loop as an alternative to continuous message consumption, as this is low performance; If you want to specify the sender of the message, you need to include it in the sending content, just like we mark our name in the mail. Only in this way can we know who the sender is. Message confirmation at the above code we can know that message after receiving channel must be used. The basicAck () method of manual confirmation (not automatically confirm delete mode), then the problem is here. What happens when a message is received unconfirmed? If an application receives a message and the bug forgets to acknowledge it, the message's state in the queue changes from "Ready" to "Unacked".Copy the code

Picture description (50 words Max)

If the message is received but not acknowledged, Rabbit will not send any more messages to the application because it believes you are not ready to receive the next message. The message will remain Unacked until you confirm it, or disconnect from Rabbit, and Rabbit will automatically change the message to be Ready for distribution to other subscribers. You can, of course, take advantage of this by having your application delay acknowledging the message until it has finished processing the business logic, effectively preventing Rabbit from sending you too many messages and causing your application to crash. Message confirmation Demo:Copy the code

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), “UTF-8”); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); Channel. basicAck(Long deliveryTag, Boolean multiple) specifies message confirmation, with parameter 1: message ID; Parameter 2: indicates whether to batch reply. True Indicates whether to batch acknowledge the messages whose id is smaller than the number of times.

Conclusion: Every message consumers consume must be confirmed. Before a message is confirmed, there are two options: Option 1: Disconnect from the Rabbit, so that the Rabbit redispatches the message to another consumer. Option 2: Reject the message sent by Rabbit using channel.basicReject(Long deliveryTag, Boolean Requeue). Parameter 2: How messages are processed. If true, Rabbib redistributes the message to other subscribers. If false, Rabbit sends messages to a special "dead letter" queue for rejected messages that are not re-queued. Message rejection Demo:Copy the code

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), “UTF-8”); channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); 2. Fanout switch — publish/subscribe

Fanout is different from the Direct exchange. Fanout is a publish/subscribe exchange. When you send a message, the exchange broadcasts the message to all queues attached to the exchange. Users upload their own face, for example, when the image needs to clear the cache, the user should get bonus points at the same time, you can put the two queue bound to the image upload switch on, so that when a third, and fourth finished uploading pictures to deal with demand, the original code can be the same, you just need to add a subscription, The sender and consumer code is completely decoupled and new functionality can be easily added. Unlike the direct exchange, we add channel.exchangeDeclare(ExchangeName, "FANout ") when sending a message. This line of code declares the FANout exchange. The sender:Copy the code

final String ExchangeName = “fanoutec”; / / the switch name Connection conn = connectionFactoryUtil. GetRabbitConnection (); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, “fanout”); // Declare the fanout switch String message = “time:” + new Date().getTime(); channel.basicPublish(ExchangeName, “”, null, message.getBytes(“UTF-8”)); Receiving messages is different from Direct in that we need to declare the FANout router and bind it to the Fanout switch using the default queue.

The receiver:Copy the code

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, “fanout”); // Declare the fanout switch String queueName = channel.queueDeclare().getQueue(); // declare a queue channel.queueBind(queueName, ExchangeName, “”); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, “UTF-8”); }}; channel.basicConsume(queueName, true, consumer); The biggest difference between FANout and Direct is at the receiving end, where fanout needs to bind queues to the corresponding exchange for subscribing messages.

Where channel.queuedeclare ().getQueue() is a random queue. Rabbit generates a random queue name, which is automatically deleted once the consumer disconnects. Note: The routingKey is not valid for fanout switches, and this parameter is ignored. Finally, topic switches operate like fanout, but can be more flexible in matching the information they want to subscribe to. This is where the routingKey comes in, using the routingKey for message (rule) matching. Suppose we now have a logging system that sends logs of all log levels to the switch, warning, log, error, fatal, but we only want to process logs above error. What should we do? This is where the Topic router comes in. The key to topic routers is to define routing keys. Define the routingKey name to be no longer than 255 bytes, using ". As a separator, for example, com.mq.rabbit.error. A routingKey can match messages with the following characters when consuming them:Copy the code

“*” matches everything; “#” matches 0 and multiple characters; For example, a “com.mq.rabbit.error” message is issued:

Matching routing keys:Copy the code

* cn.mq.rabbit.# #. Error cn.mq.#

Cn.mq. * *. Error so if you want to subscribe to all messages, you can use the “#” match.

Note: Fanout and Topic exchanges have no historical data, that is, for queues created in the middle of the process, no previous messages can be retrieved. Publish end:Copy the code

String routingKey = “com.mq.rabbit.error”; Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, “topic”); Topic switch String message = “time:” + new Date().getTime(); channel.basicPublish(ExchangeName, routingKey, null, message.getBytes(“UTF-8”)); The receiver:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, “topic”); // Declare topic switch String queueName = channel.queueDeclare().getQueue(); // Declare queue String routingKey = “#.error”; channel.queueBind(queueName, ExchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, “UTF-8”); System. The out. Println (routingKey + “| receiving message = >” + message); }}; channel.basicConsume(queueName, true, consumer); Extensions – Custom thread pools

If larger control connections are required, the user can set the thread pool as follows:Copy the code

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); Connectionfactory.newconnection has a thread pool by default. ConnectionFactory.

private ExecutorService sharedExecutor; public Connection newConnection() throws IOException, TimeoutException { return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); } public void setSharedExecutor(ExecutorService executor) { this.sharedExecutor = executor; } This.sharedexecutor is the default thread pool. The ConnectionFactory thread pool can be set using setSharedExecutor(), or null if not.

If a user sets up a thread pool himself, as described in the first section of this section, the user's custom thread pool is not automatically closed when the connection is closed, so the user must manually shut it down by calling shutdown(), otherwise it may prevent the JVM from terminating. The official advice is that you should only consider using this feature if your application has serious performance bottlenecks.Copy the code

Welcome Java engineers who have worked for one to five years to join Java architecture development: 855835163 Group provides free Java architecture learning materials (which have high availability, high concurrency, high performance and distributed, Jvm performance tuning, Spring source code, MyBatis, Netty, Redis, Kafka, Mysql, Zookeeper, Tomcat, Docker, Dubbo, multiple knowledge architecture data, such as the Nginx) reasonable use their every minute and second time to learn to improve yourself, don’t use “no time” to hide his ideas on the lazy! While young, hard to fight, to the future of their own account!