Life will be a solo journey, before loneliness is lost, after loneliness is growth.
wedge
I would like to apologize for not Posting for the past week. I have some important things to deal with.
Today just dealing, originally said to write SpringIOC things related, but found that want to comb again still need a lot of time, so I’m going to write slowly, the MQ to write the first, then slowly write other related, after all, the theory of partial things again is to hard to write, such as MQ partial real you can clone code to play, Still more convenient.
MQ is also one of the technology stacks required to advance Java, so it is a must for Java development practitioners to understand.
There are three types of message queue comparisons on the market: RabbitMQ, RocketMQ and Kafka.
I’m going to use RabbitMQ as an example to get started with today’s message queues, because SpringBoot’s AMQP is integrated with RabbitMQ by default, which is much easier to use, and it’s a time-tested, open source component with excellent performance and stability.
Good harvest.
This article code: code cloud address GitHub address
1. 🔍 Message queue?
Message Queue (MQ), short for Message Queue, is an application-to-application method of communication.
A message component is placed between applications, and the two applications communicate through the message component.
Why would you put a component in the middle?
Small system actually is to use less than a message queue, general distributed system into the message queue, as distributed systems need to withstand high concurrency, need more system decoupling, the response speed of the need for more user friendly, and the characteristics of the message queue can be natural decoupling, asynchronous can play a more convenient to resist high concurrency peak clipping effect, the perfect solution to the above three questions.
However, everything is positive and negative, and a middleware was suddenly added between the systems, which increased the complexity of the system and also added many problems:
- What if messages are lost?
- What about repeated consumption of news?
- Some tasks require sequential messages. How can sequential consumption be ensured?
- How is the availability of message queue components guaranteed?
These are all things that you need to think about when you’re using a message queue. Message queues can bring you a lot of benefits, but they can also bring you some problems.
I’ve talked about the benefits and problems of message queuing, but that’s outside the scope of today’s post, so I’m going to write about that later, but what we’re going to do today is set up a message queuing environment and give you a sense of the basics of sending and consuming messages. More advanced issues will come later.
2. 📖 RabbitMQ view
RabbitMQ is a messaging component that is an open source implementation of AMQP (Advanced Message Queue) developed by Erlang.
AMQP, or Advanced Message Queuing Protocol, is an open standard of application-layer protocols and is designed for message-oriented middleware.
RabbitMQ uses the AMQP protocol, and as far as this is concerned, we are concerned with how RabbitMQ is structured and how it is used.
Again, we need to learn the big picture first. We need to know what RabbitMQ is like first, so it will help us in the future.
Let’s take a look at the architecture I just drew, because RabbitMQ implements the AMQP protocol, so these concepts are also common to AMQP.
-
Broker: middleware itself. The application that receives and distributes messages, in this case, the RabbitMQ Server.
-
Virtual Host: indicates a Virtual host. Designed for multi-tenancy and security, the basic components of AMQP are grouped into virtual groupings, similar to the concept of a namespace on a network. When multiple users use the same RabbitMQ server, they can be divided into multiple vhosts. Each user can create an Exchange/queue on its own vhost.
-
Connection: the Connection. TCP connections between publisher/Consumer and broker. The disconnection is performed only on the client side, and the Broker is not disconnected unless there is a network failure or a Broker service problem.
-
Channel: the Channel. If you set up a Connection every time you access RabbitMQ, it is expensive and inefficient to set up a TCP Connection when the number of messages is high. A Channel is a logical connection established inside a connection. If an application supports multiple threads, it is common for each thread to create a separate Channel to communicate. The AMQP method contains the channel ID to help the client and Message Broker identify the channel, so the channel is completely isolated from each other. As a lightweight Connection, a Channel greatly reduces the overhead of establishing a TCP Connection for the operating system.
-
Exchange: routing. According to the distribution rules, the routing key in the query table is matched and messages are distributed to the queue.
-
Queue: Queue of messages. Messages are eventually sent here for consumption, and a message can be copied to multiple queues at the same time.
-
Binding: Binding. A virtual connection between an Exchange and a queue. A binding can contain a routing key. The Binding information is stored in the query table in the Exchange and is used as the basis for message distribution.
After looking at these concepts, LET me go through the process again:
When a producer sends a message to the Broker(RabbitMQ), the Broker sends the message to a different Virtual host based on the message identifier, and the Exchange distributes the message to its Queue based on the route key and type of Exchange.
The consumer then gets the message pushed through the Connection Channel and pulls it for consumption.
Tip: An Exchange has its own Queue, which is determined by Binding.
3. 💡 RabbitMQ environment
This is the general structure of RabbitMQ and how a message works. With the theory out of the way, it’s time to get ready for a RabbitMQ installation.
Official website to download address: www.rabbitmq.com/download.ht…
Since I don’t have my own MAC, I’ll use the Windows demo here, but you’re all programmers, so you can install something 😂
Windows: www.rabbitmq.com/install-win…
Once inside, you can go to Direct Downloads and download the related EXE program to install it.
RabbitMQ is written in the Erlang language, so we need to install the Erlang environment before installing RabbitMQ. After you download RabbitMQ, click Install. If you don’t have the Erlang environment, the installer will prompt you and then ask your browser to open the Erlang download page. Click on this page to download and install RabbitMQ based on your system type. After the installation, go to RabbitMQ.
The installation of both just takes you to the NEXT step.
When the installation is complete, press the Windows key to see what it looks like:
Tip: Rabbit-command is the Command line console for RabbitMQ.
After installing RabbitMQ we need to import the JAR packages related to RabbitMQ into our development environment as well.
For convenience, we can use the spring-boot-start import, which will also contain all the RabbitMQ-related JARS we need.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
Copy the code
Spring-boot-starter – AMQP can be introduced directly.
4. ✍ Hello World
Once the environment is set up, we can get started.
Given that this is an introductory article, many readers may not have been exposed to RabbitMQ, and using autoconfiguration can be confusing because auto-configuration hides a lot of details, leaving you with only the wrapped version, which is not easy to understand.
So for this Hello World video, I’m going to go straight to the original connection, just to show you what the original connection looks like.
Tip: I’ve kept the code for this demo under the Prototype package.
4.1 producers
Let’s first look at the producer code, which is the code for our push message:
public static final String QUEUE_NAME = "erduo";
// Create a connection factory
ConnectionFactory connectionFactory = new ConnectionFactory();
Connect to the local server
connectionFactory.setHost("127.0.0.1");
// Create a connection through a connection factory
Connection connection = connectionFactory.newConnection();
// Create a channel through a connection
Channel channel = connection.createChannel();
// Create a queue named ears. This queue is non-persistent (it will disappear when RabbitMQ restarts), non-exclusive (not only for this link), and non-delete (the server will delete queues that are no longer used).
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
String msg = "Hello, this is the ear." + LocalDateTime.now().toString();
// Publish the message
// The four parameters are: specify router, specify key, specify parameter, and binary data content
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("The producer finishes sending the message and sends the following message:" + msg);
channel.close();
connection.close();
Copy the code
I’ve commented out the code, but I’m going to walk you through it, just to sort it out.
Use the ConnectionFactory in RabbitMQ to configure the server host to be connected to, create a new connection, create a Channel from this connection, and use this Channel to create queues and send messages.
Now, this seems to make a lot of sense, but I’m going to have to break it down to creating queues and sending messages.
Create a queue
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
Copy the code
The queue method has five parameters. The first parameter is the name of the queue, the next three parameters represent different configurations, and the last parameter is an extra parameter.
-
Durable: Indicates whether to persist the queue.
-
Exclusive: indicates exclusive or not. If set to exclusive queue, this queue will be visible only to the connection it was first declared for and will be automatically deleted when the connection is disconnected.
-
AutoDelete: indicates whether the queue is automatically deleted after the connection is disconnected.
-
Arguments: indicates additional parameters.
Durable: Durable: Durable: Durable: Durable: Durable: Durable: Durable: Durable: Durable
Send a message
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
Copy the code
The method to send a message takes four parameters. The first is required to specify exchange. In the above example code we passed in an empty string, which means we are sending a default anonymous Exchange to route the message for us.
The second parameter is the routing key that Exchange will use to route and forward the message, the third parameter is the extra parameter that we will use to persist the message, and the last parameter is the data that we are going to send, which we need to pass in as a byte array.
test
Now that we’re done with these apis, we can test our code. When we run it, it will look like this on the console:
We can open the RabbitMQ console (mentioned earlier) and use the command rabbitmqctl.bat list_queues to see what the queues look like:
We can see that there is a message inside, which means that our message has been sent successfully, and we can write a consumer to consume the message inside.
4.2 consumers
The consumer code is similar to the producer code, and both need to establish a connection to establish a channel:
// Create a connection factory
ConnectionFactory connectionFactory = new ConnectionFactory();
Connect to the local server
connectionFactory.setHost("127.0.0.1");
// Create a connection through a connection factory
Connection connection = connectionFactory.newConnection();
// Create a channel through a connection
Channel channel = connection.createChannel();
// Create a consumer and block receiving messages
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchangeName : " + envelope.getExchange());
System.out.println("routingKey : " + envelope.getRoutingKey());
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Message content:"+ msg); }};// Start the consumer consumption specified queue
channel.basicConsume(Producer.QUEUE_NAME, consumer);
// channel.close();
// connection.close();
Copy the code
After establishing the channel, we need to create a consumer object that will consume the messages in the specified queue.
In this example we create a new consumer and use it to consume messages from queue-erduo.
I commented out the last two lines of code, because once we close the connection, our consumer can’t stay in the consumption state, so we need to keep the connection open and listen to the queue.
Ok, run this program, and our consumer will go to queue-erduo to get the message. The effect is as follows:
-
ConsumerTag: Is the tag for this message.
-
ExchangeName: is the name of the exchange to which this message is sent. We passed an empty string earlier, so this is also an empty string.
-
ExchangeName: indicates the routing key to which the message is sent.
So our program is in a listening state, you call the producer again to send the message and the consumer will print the message on the control in real time.
5. 📌 Acknowledgement of Message Receiving (ACK)
We have shown producers and consumers. We have a producer sending a message and a consumer consuming a message. How many messages should we have in RabbitMQ?
In theory, send one, consume one, it should now be 0, but it’s not:
There is still one message in the message queue, we restart the consumer, we print the message that we consumed again, and we can see from the time on the message that we sent the same message, which means that the message has not been deleted after we consumed the message.
This occurs because of the RabbitMQ acknowledgement mechanism, which means that once a message has been received by a consumer, a confirmation operation is required before the message can be deleted.
The RabbitMQ consumption manual is confirmed by default, also it can be set to automatically delete, automatically delete mode after the consumer receives the message will be automatically deleted this message, if the message during the processing of abnormal happens, the message is not being processed but also be removed, so here we will confirm has been using manual mode.
The ACK code is as simple as adding one line to the original consumer’s code:
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchangeName : " + envelope.getExchange());
System.out.println("routingKey : " + envelope.getRoutingKey());
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Message content:" + msg);
// The message is confirmed
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("Information confirmed."); }};Copy the code
After we change the code to this, we can run the consumer again and see what happens:
Let’s look at queues in RabbitMQ:
From the figure we can see that the message was successfully deleted after consumption. In fact, a wild guess is that the automatic deletion should have returned confirmation before our code was executed, so this leads to the possibility of the message being lost.
After we use manual validation, we can put the manual validation code on the last line after the logic is processed (try and catch the possible exception), so that if an exception occurs and the message is not acknowledged, the message will be consumed again later.
Afterword.
That’s it for today. In the next installment, we’ll skip the traditional manual connection and send and receive messages. Instead, we’ll use Spring’s defined annotations and Spring’s RabbitTemplate to make it easier to send and receive messages.
Message queue, actually the usage is the same, the emphasis of the various open source message queue just slightly different, we should according to our own project requirements to decide we should choose what kind of message queue to service for our project, the project selection work is generally development group leader to help you do, is generally not we to do, However, you may be asked for information during the interview, so we should cover all of them.
Well, that’s all for this episode, thank you for being here, welcome to like and comment on this article, 👍 every one of your likes is the biggest motivation for my creation.
I am Ear, a pseudo-literary programmer who always wants to do knowledge output. See you next time.
This article code: code cloud address GitHub address