This is the 15th day of my participation in Gwen Challenge
The Work model
Introduction (Important)
Queues are called Work Queues. When message processing is time-consuming, messages may be produced faster than they are consumed.
In the long run, messages pile up and can’t be processed in a timely manner. Use Work Queues: Queues that bind multiple consumers to a queue and consume messages together.
Once the messages in the queue are consumed, they disappear, so the task is not repeated.
The main idea behind this is to avoid performing a resource-intensive task immediately and wait for it to complete.
Instead, we plan tasks to be completed later. We encapsulate the task as a message and send it to the queue.
A worker process running in the background pops up the task and eventually executes the job. When many workers are running, tasks are shared among them.
Code implementation
Now I’ll test the model based on the first blog’s utility classes.
As shown in the diagram in the introduction, one producer and two consumers do the job, and let’s see what the result is.
package com.xn2001.workquene;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/2 14:05 * * /
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work".true.false.false.null);
// Production message
for (int i = 0; i < 10; i++) {
channel.basicPublish(""."work".null, (i + "Here comes the message.").getBytes()); } RabbitMQUtil.closeChannelAndConnection(channel, connection); }}Copy the code
package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/2 18:49 * * /
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work".true.false.false.null);
channel.basicConsume("work".true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(newString(body)); }}); }}Copy the code
package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/2 18:56 * * /
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work".true.false.false.null);
channel.basicConsume("work".true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(newString(body)); }}); }}Copy the code
Run two consumers, then turn on producers to produce messages.
The results are shown below:
You can see 02468 in the first consumer, 13579 in the second consumer.
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
By default RabbitMQ sends each message in turn to the next user. On average, each consumer receives the same number of messages.
This way of distributing messages is called a loop. Try to work with three or more consumers.
Cyclic scheduling
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
One of the advantages of using task queues is that you can easily parallelize work. If we’re backlogged, we just need to add more workers, so we can easily scale up.
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
It takes a few seconds to complete a task. You might wonder what happens if a consumer starts a long task and only partially completes it. In our current code, as soon as RabbitMQ passes a message to the consumer, it marks it for deletion. In this case, if you lose a worker, we will lose the message it is processing. We also lose all messages sent to this particular worker that have not yet been processed.
Message to confirm
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
But we don’t want to lose any missions. If one worker quits, we want to hand over the task to another.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
To ensure that messages are not lost, RabbitMQ supports message confirmation. The consumer sends back an acknowledgement message telling RabbitMQ that a particular message has been received and processed, and RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
If the consumer dies without sending an ACK (its channel is closed, the connection is closed, or the TCP connection is lost) RabbitMQ will understand that the message was not fully processed and will re-queue it. If there are other consumers online at the same time, it will be quickly redelivered to another consumer. That way you can be sure that no information is lost, even if workers die occasionally.
We are going to simulate a “do-it-all” scenario to perfectly circumvent this problem.
First, we need to simulate the inefficiency of one consumer, so that another consumer spends more than he does.
To implement this scenario, we simply let the consumer 1 thread wait for 1 second, the consumer 2 thread run normally, and turn off message auto-determination.
package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/ * * *@authorHappy heart lake *@date2020/6/2 18:49 * * /
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Set consumption to one message at a time
channel.basicQos(1);
channel.queueDeclare("work".true.false.false.null);
// Parameter 2: disables automatic message acknowledgement
channel.basicConsume("work".false.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try{
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
System.out.println(e);
}
System.out.println(new String(body));
// Manual message acknowledgement
// Parameter 1: determines the specific message in the queue
// Parameter 2: Whether to enable simultaneous acknowledgement of multiple messages
channel.basicAck(envelope.getDeliveryTag(),false); }}); }}Copy the code
package com.xn2001.workquene;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/2 18:56 * * /
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work".true.false.false.null);
// Parameter 2: false indicates that the message is not automatically determined
channel.basicConsume("work".false.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
// Manual message acknowledgement
// Parameter 1: determines the specific message in the queue
// Parameter 2: Whether to enable simultaneous acknowledgement of multiple messages
channel.basicAck(envelope.getDeliveryTag(),false); }}); }}Copy the code
package com.xn2001.workquene;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/2 14:05 * * /
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work".true.false.false.null);
// Production message
for (int i = 0; i < 20; i++) {
channel.basicPublish(""."work".null, (i + "Here comes the message.").getBytes()); } RabbitMQUtil.closeChannelAndConnection(channel, connection); }}Copy the code
It turned out, as expected, that the best were the worst.
The Fanout model
Introduction (Important)
Let’s quickly go over what we covered in the previous tutorials:
- A Producer producersis a user application that sends messages. Is a user application that sends messages
- A The queue queueis a buffer that stores messages. Is a buffer that stores messages
- A Consumer consumersis a user application that receives messages. Is a user application that receives messages
At the heart of the messaging model in Rabbitmq is the idea that the producer never sends any message directly to the queue. In fact, many times the producer does not even know if the message will be delivered to any queue.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Instead, producers can only send messages to the switch. On the one hand it receives messages from producers and on the other it pushes messages to queues. The switch must know exactly what to do with the messages it receives.
-
Should it be added to a particular queue?
-
Should it be attached to many queues?
-
Or it should be discarded.
-
The rules are defined by the switch type.
There are a few exchange types available: direct, topic, headers and fanout. We’ll focus on the last one — the fanout. Let’s create an exchange of this type, and call it logs:
The types of exchanges available are direct, topic, header, and fan out.
We will focus on the last -> sector swap.
Let’s create an exchange of this type and call it logs:
channel.exchangeDeclare("logs"."fanout");
Copy the code
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that’s exactly what we need for our logger.
Fan-out swapping is very simple. It simply broadcasts all the messages it receives to all the queues it knows.
Fanout model: fanout, we also call broadcast
In broadcast mode, the message sending process looks like this:
- You can have multiple consumers
- Each consumer has its own queue.
- Every queue has to be bound to Exchange
- A message sent by a producer can only be sent to the switch. The switch decides which queue to send the message to, but the producer cannot decide.
- The switch sends messages to all queues that are bound
- All consumers in the queue get the message. Implement a message to be consumed by multiple consumers
Code implementation
How did we get the news out before,
channel.basicPublish(""."hello".null, message.getBytes());
Copy the code
The first argument is the name of the exchange, and the empty string represents the default or nameless exchange: if one exists, the message is routed to the message queue using the name specified by the routingKey.
Let’s create a switch
channel.exchangeDeclare("logs"."fanout");
Copy the code
So now we can publish to the exchange we just created:
channel.basicPublish( "logs"."".null, message.getBytes());
Copy the code
Develop consumers
We need to use at least three consumers and start to see if all of them can accept the broadcast message.
package com.xn2001.fanout;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 0:39 * * /
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Bind the switch
channel.exchangeDeclare("logs"."fanout");
// Get the temporary queue
String queue = channel.queueDeclare().getQueue();
// Channels bind switches and queues
channel.queueBind(queue, "logs"."");
// Consume messages
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 1:" + newString(body)); }}); }}Copy the code
package com.xn2001.fanout;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 0:39 * * /
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Bind the switch
channel.exchangeDeclare("logs"."fanout");
// Get the temporary queue
String queue = channel.queueDeclare().getQueue();
// Channels bind switches and queues
channel.queueBind(queue, "logs"."");
// Consume messages
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code
package com.xn2001.fanout;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 0:39 * * /
public class ConsumerThree {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Bind the switch
channel.exchangeDeclare("logs"."fanout");
// Get the temporary queue
String queue = channel.queueDeclare().getQueue();
// Channels bind switches and queues
channel.queueBind(queue, "logs"."");
// Consume messages
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 3:" + newString(body)); }}); }}Copy the code
Start these three consumers, and then try sending messages using producers. You can see that all three consumers can consume the message. This is our fan out model.
Routing model
The switch type of this model is Direct
Introduction to the
Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity. For example we may want a program which writes log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
Our logging system broadcasts all messages from the previous model to all consumers. We want to extend it to allow messages to be filtered based on severity. For example, we might want a program to write log messages to disk to receive only critical errors and not waste disk space with warning or informational log messages.
We were using a fanout exchange, which doesn’t give us much flexibility – it’s only capable of mindless broadcasting.
Important: We are using the fan-out swap, which doesn’t give us much flexibility as it can only be broadcast unconsciously.
We will switch to direct exchange.
The routing algorithm behind direct switching is simple.
A message is sent to a queue whose bound key exactly matches the routing key of the message.
Bindings can take additional routingKey arguments. The fan-out swap we used before completely ignored its value.
Code implementation
Let’s implement such a process.
First of all, I need to clarify exactly how this simulated scenario works. We have two consumers, the first one binding error messages (routingKey) and the other binding Info, Error, and Warning messages. We will send an INFO message, presumably only the latter can be consumed.
package com.xn2001.direct;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 16:58 * * /
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Declare switches and switch types
channel.exchangeDeclare("logs_direct"."direct");
Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Temporary queues bind to switches
channel.queueBind(queue,"logs_direct"."error");
// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 1:" + newString(body)); }}); }}Copy the code
package com.xn2001.direct;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 hast judged * * /
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Declare switches and switch types
channel.exchangeDeclare("logs_direct"."direct");
Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Temporary queues bind to switches
channel.queueBind(queue, "logs_direct"."info");
channel.queueBind(queue, "logs_direct"."error");
channel.queueBind(queue, "logs_direct"."warning");
// Consume messages
channel.basicConsume(queue, true.new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code
package com.xn2001.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 17:03 * * /
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Declare a switch
channel.exchangeDeclare("logs_direct"."direct");
// Send a message
String routingKey = "info";
channel.basicPublish("logs_direct",routingKey,null((,"This is published by the Direct model based on routing Key:"+routingKey+"Message sent").getBytes())); RabbitMQUtil.closeChannelAndConnection(channel,connection); }}Copy the code
As expected, only consumer 2 received the message.
Of course, if you send a message other than error, info, or warning, that means no consumer can consume the message.
Switchable viewer model
Introduction to the
In the last model, we improved our logging system. Instead of using fan-out switching, which can only be broadcast virtually, we use direct broadcasting, which gives us the possibility of selectively receiving logs. While using direct switching improved our system, it still had limitations in that it could not route based on multiple criteria. In our logging system, we might want to subscribe not only to logs based on severity, but also to logs based on the source that is issuing the logs.
That would give us a lot of flexibility – we may want to listen to just critical errors coming from ‘cron’ but also all logs from ‘kern’.
To implement that in our logging system we need to learn about a more complex topic exchange.
This would give us a lot of flexibility — we might want to listen for critical errors from “cron”, or we might want to listen for all logs from “Kern”.
To achieve this in our logging system, we need to understand more complex topics.
The idea is to set up a routingKey in your consumer to match the desired message in more detail.
* stands for one word. # stands for everything that follows
Let’s show it in code to make it a little bit clearer.
Code implementation
package com.xn2001.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@date2020/6/4 19:28 * * /
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics"."topic");
String routingKey = "user.hello.hh";
channel.basicPublish("topics",routingKey,null, ("routeKey:"+routingKey).getBytes()); RabbitMQUtil.closeChannelAndConnection(channel,connection); }}Copy the code
package com.xn2001.topic;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@dateThis was the 2020/6/4 * * /
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Declare switches and switch types
channel.exchangeDeclare("topics"."topic");
Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Temporary queues bind to switches
channel.queueBind(queue,"topics"."user.*");
// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 1:" + newString(body)); }}); }}Copy the code
package com.xn2001.topic;
import com.rabbitmq.client.*;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
/ * * *@authorHappy heart lake *@dateBank against 2020/6/4 * * /
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// Declare switches and switch types
channel.exchangeDeclare("topics"."topic");
Create a temporary queue
String queue = channel.queueDeclare().getQueue();
// Temporary queues bind to switches
channel.queueBind(queue,"topics"."user.#");
// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer 2:" + newString(body)); }}); }}Copy the code
ConsumerTwo gets the message because its routingKey says #.
Alternatively, you can test if our producer sends a message with two words, such as user.hello
That means both consumers can get it.