This is the first day of my participation in the Gwen Challenge in November. Check out the details: the last Gwen Challenge in 2021

First, core composition

1.1. Core Components

Server: Also known as Broker. It accepts connections from clients and implements AMQP entity services. Install the rabbitmq server. –

TCP/IP/ three-way handshake and four-way wave

Channel: Network Channel. Almost all operations are carried out in a Channel. A Channel is a Channel for reading and writing messages.

Message: Data transmitted between a service and an application. It consists of Properties, which modify the Message with advanced features such as priority and latency, and body, which is the content of the Message body.

Virtual Host Indicates the Virtual address used for logical isolation and routing messages to the highest level. A Virtual Host can have several Exhanges and Queueu

Exchange: a switch that receives messages and sends them to a bound queue based on a routing key. (== No ability to store messages ==)

Bindings: Virtual links between Exchanges and queues. Bindings can protect multiple routing keys.

Routing key: IS a Routing rule that a virtual machine can use to determine how to route a particular message.

Queue: also known as Message Queue, Message queues that save messages and forward them to consumers.

1.2 RabbitMQ overall architecture

1.3 operation process

  • 1. The producer generates message data
  • 2. Only switch and routing information will be specified after serialization
  • 3. Send messages to the Broker
  • 4. The consumer consumes the specified queue message according to the route

Introduction to message patterns

RabbitMQ provides six message models, but the sixth is RPC, not MQ, and the remaining five, 3, 4 and 5, are subscription models, but routed in different ways. The first simple pattern was covered in the initial article. In this section, we’ll learn how to use the other four messages.

2.1 the Work the Queues

Work queue or competitive consumer model.

Unlike the simple pattern, there are multiple queues to consume these messages. That is, there are multiple consumers, and a message can only be consumed by one work queue at a time. And that makes sense.

So how does RabbitMQ ensure to which consumer the message is sent? There are two sending strategies:

  • Polling send: one consumer one, according to the distribution;
  • Fair delivery: fair distribution according to the consumption capacity of consumers, distribution according to work;
2.1.1 Polling Send

The default is poll send.

Producer code:

@author xiaolei * @date 2021/10/28 11:21 **/ public class ProducerRobinTest {public static void ProducerRobinTest main(String[] args) throws IOException, TimeoutException, InterruptedException {// 1, create ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("111111"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" producer "); Channel channel = connection.createchannel (); The same topic is not allowed to exist. * @Params1: Queue Queue name * @Params2: durable Queue Persistence * @params3: Exclusive Specifies whether the queue is private. If true, the current queue is locked. * @params4: AutoDelete Whether to automatically delete */ channel.queueDeclare("queue1", false, false, false, null); For (int I = 1; i <=10; I++) {// @params1: switch exchange // @params2: queue name /routing // @params3: property configuration // @params4: BasicPublish ("","queue1",null,(" + I +") ).getBytes()); Thread.sleep(1000); } // 7, close connection channel.close(); }}Copy the code

Consumer code: Both consumers are the same

public class RobinWork1 { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("111111"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" consumer "); Channel channel = connection.createchannel (); DeliverCallback deliverCallback =(String a, Delivery b)->{ String message = new String(b.getBody()); System.out.println("work1"+message); }; CancelCallback CancelCallback =(String a)->{system.out.println (" message consumption is interrupted "); }; /** ** Consumers consume messages * @params1: which queue to consume * @params2: Whether to automatically reply after the consumption succeeds true: automatic reply, flase: manual reply. * @ params3: consumers' success callback * @ params4: consumer spending failed callback. * / channel basicConsume (" queue1 ", true, deliverCallback, cancelCallback); }}Copy the code

The printed results are distributed evenly,

2.1.2 Fair Send

This strategy is based on the ability of consumers to process messages, when there is a problem of slow processing, it adopts the mode of processing. “Do more of it.”

To enable this policy, the consumer needs to turn on the manual answer, turn off the automatic answer. while

Code to turn off auto answer:

channel.basicConsume("queue1",false,deliverCallback,cancelCallback);
Copy the code

Enable manual answer code:

channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
Copy the code

The idea is that the consumer tells MQ not to send a new message until the current message has been processed, and to reply each time it has been consumed.

The consumer code is the same, except give one of the consumers more time to sleep.

public class FairWork2 { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("111111"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" consumer2 "); Channel channel = connection.createchannel (); Channel.basicqos (1); DeliverCallback deliverCallback =(String a, Delivery b)->{ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(b.getBody()); System.out.println("work2"+message); BasicAck (b.getenvelope ().getDeliveryTag(),false); // Manually reply message false: single reply true: batch reply, channel.basicack (b.getenvelope ().getDeliveryTag()); }; CancelCallback CancelCallback =(String a)->{system.out.println (" message consumption is interrupted "); }; /** ** Consumers consume messages * @params1: which queue to consume * @params2: Whether to automatically reply after the consumption succeeds true: automatic reply, flase: manual reply. * @ params3: consumers' success callback * @ params4: consumer spending failed callback. * / channel basicConsume (" queue2 ", false, deliverCallback, cancelCallback); }}Copy the code

When writing the test class, if the channel does not close the connection properly, you can see that there are extra consumers on the side, so you can close these threads. Ensure that the worker thread has only itself.

2.2 Publish and Subscribe Mode (FANout)

The publish-subscribe model is structured as follows:

The simple mode is essentially the same as the work queue mode, with only one queue. After a message is sent to the Exchange, the Exchange determines that it is in direct mode and forwards the message to the bound queue. Moreover, a message can only be sent to one consumer, and the publish-subscribe model has multiple queues to jointly consume data.

In this mode:

  • 1. Create only named exchanges whose type is fanout.
  • 2. The consumer creates its own queue and connects to the Exchange created by Publisher to bind the exchange and queue. If a queue does not require persistence, a temporary queue can be used.
  • 3. The publisher sends a message. When the message reaches the Exchange, the Exchange determines that it is in fanout mode and sends the message directly to its queue
  • 4. Each queue distributes messages to its connected consumers
2.2.1 Four Switch types:

Send messages to rabbitmq it must have a switch, the following if not specified by default.

  • Direct: Deals with routing keys by binding a queue to the switch and requiring that the message exactly matches a particular routing key. This is a complete match.

  • Topic: Matches a routing key to a pattern. At this point the queue needs to be bound to a pattern. The symbol “#” matches one or more words and the symbol “” matches no more than one word. So “ABC.#” will match “abc.def.ghi”, but “ABC.” will only match “abc.def”.

  • Headers:

    Routing keys are not processed. Instead, a match is made based on the HEADERS attribute in the content of the sent message. Specify a set of key-value pairs when binding Queue and Exchange. When a message is sent to RabbitMQ, the headers of the message will be matched with the key-value pair specified by the Exchange binding. If there is a perfect match, the message is routed to the queue, otherwise it is not. The HEADERS attribute is a key-value pair, which can be a Hashtable, and the key-value pair can be of any type. The routing keys of Fanout, Direct, and Topic all need to be strings.

  • Fanout: Does not process routing keys. You simply bind the queue to the switch. A message sent to a switch is forwarded to all queues bound to the switch. Much like subnet broadcasting, each host in a subnet gets a copy of the message. The Fanout switch is the fastest to forward messages.

2.2.2 Publish and subscribe model test

producers

public class Producer { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("test"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("111111"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" producer "); Channel channel = connection.createchannel (); /** * 5, set the mode fanout, name exchange1 */ channel.exchangeDeclare("exchange1","fanout"); For (int I = 1; i <=20; I++) {// @params1: switch exchange // @params2: queue name /routing // @params3: property configuration // @params4: BasicPublish ("exchange1","",null,(" + I +") ).getBytes()); System.out.println(" send "+ I); }}}Copy the code

Consumer:

public class Consumer { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); connectionFactory.setVirtualHost("test"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" consumer2 "); Channel channel = connection.createchannel (); Channel.basicqos (1); DeliverCallback deliverCallback =(String a, Delivery b)->{ String message = new String(b.getBody()); System.out.println("work2"+message); }; CancelCallback CancelCallback =(String a)->{system.out.println (" message consumption is interrupted "); }; channel.queueDeclare("queue2",false,false,true,null); channel.queueBind("queue2","exchange1",""); /** ** Consumers consume messages * @params1: which queue to consume * @params2: Whether to automatically reply after the consumption succeeds true: automatic reply, flase: manual reply. * @ params3: consumers' success callback * @ params4: consumer spending failed callback. * / channel basicConsume (" queue2 ", true, deliverCallback, cancelCallback); }}Copy the code

The consumer 2 queue has a different name, but all else is the same, and you see the two queues bind under exchange1.

2.3 Routing Mode (Direct)

The Direct mode is one of fanout mode, which adds the selection of routing keys.

The structure is as follows:

After setting the routing mode and binding the corresponding key:

  • A message with a routing-key error will only be sent to Q1;
  • Messages with the info, error, and warning keys are sent to Q2, while other messages are ignored and discarded.
  • The error is noticed by all the consumers, so it actually acts as a publish subscribe mode, and its messages are sent to consumer1 and Consumer2.

Producer-specified routing:

public class Producer { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("test"); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" producer "); Channel channel = connection.createchannel (); /** * 5, specify the mode fanout, name exchange1 */ channel.exchangeDeclare("exchange2","direct"); For (int I = 1; i <=20; I++) {// @params1: switch exchange // @params2: queue name /routing // @params3: property configuration // @params4: If (I %2==0){channel.basicPublish("exchange2","error",null,(" exchange2"," + I +"); ).getBytes()); }else{channel. publish ("exchange2","info",null,(" exchange2","info",null,(" exchange2") ).getBytes()); } system.out. println(" send "+ I); }}}Copy the code

Consumer-specified routing:

public class Consumer { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); connectionFactory.setVirtualHost("test"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" consumer2 "); Channel channel = connection.createchannel (); Channel.basicqos (1); DeliverCallback deliverCallback =(String a, Delivery b)->{ String message = new String(b.getBody()); System.out.println("work2"+message); }; CancelCallback CancelCallback =(String a)->{system.out.println (" message consumption is interrupted "); }; channel.queueDeclare("queue2",false,false,true,null); channel.queueBind("queue2","exchange2","error"); /** ** Consumers consume messages * @params1: which queue to consume * @params2: Whether to automatically reply after the consumption succeeds true: automatic reply, flase: manual reply. * @ params3: consumers' success callback * @ params4: consumer spending failed callback. * / channel basicConsume (" queue2 ", true, deliverCallback, cancelCallback); }}Copy the code

2.4 Topic Mode

The topic pattern can be seen as an extension of the routing pattern by adding the routing key pattern. Just like our fuzzy match.

The routing-key sent to the topic exchange must be a regular string, for example, separated by a “. “, and the string in each delimiter must have characteristics, for example, “xiaolei.orange”, whose maximum length is 255 bytes.

Topic mode is similar to the Direct mode. For bind keys, topic mode has two features:

  • The asterisk * can replace any character
  • “#” can replace 0 to more characters.

For example, a batch of films includes patriotic films, action films and comedies. Patriotic films, action films have Wu Jing; Patriotic film, comedy there is shen Teng.

  • If a group of fans are Wu Jing’s fans, he will find all Wu Jing’s movies. The routing rule is “*. Wu Jing “or “#. Wu Jing”;
  • Some fans only watch Wu Jing in patriotic movies, not interested in action movies, routing rule is “patriotic. Wu “;

Producers:

public class Producer { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("test"); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" producer "); Channel channel = connection.createchannel (); /** * 5, specify the mode fanout, name exchange1 */ channel.exchangeDeclare("exchange3","topic"); For (int I = 1; i <=40; I++) {// @params1: switch exchange // @params2: queue name /routing // @params3: property configuration // @params4: If (I %4==0){channel.basicPublish("exchange3"," patriotic ". Wu Jing ", NULL,(" Patriotic. Wu Jing, say "+ I +" times. ).getBytes()); }else if(I %4 ==1){channel.basicPublish("exchange3"," exchange3"); Shen Teng ", NULL,(" Patriotic. Shen Teng, say "+ I +" again. ).getBytes()); }else if(i%4 ==2){ System.out.println("ggg"); Channel. BasicPublish (" exchange3 ", "action. Wu Jing ", NULL,(" action. Wu Jing, say "+ I +" times. ).getBytes()); }else if(I %4 ==3){channel.basicPublish("exchange3"," exchange3") Shen Teng ", NULL,(" Comedy. Shen Teng, say "+ I +" again. ).getBytes()); } system.out. println(" send "+ I); }}}Copy the code

Consumer:

public class Consumer { public static void main(String[] args) throws IOException, TimeoutException {// 1, create a ConnectionFactory ConnectionFactory = new ConnectionFactory(); / / 2, set up the connection attributes connectionFactory. SetHost (" 192.168.81.102 "); connectionFactory.setPort(5672); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); connectionFactory.setVirtualHost("test"); / / 3, obtained from the Connection factory link Connection Connection = connectionFactory. NewConnection (" consumer2 "); Channel channel = connection.createchannel (); Channel.basicqos (1); DeliverCallback deliverCallback =(String a, Delivery b)->{ String message = new String(b.getBody()); System.out.println("work2"+message); }; CancelCallback CancelCallback =(String a)->{system.out.println (" message consumption is interrupted "); }; channel.queueDeclare("queue2",false,false,true,null); channel.queueBind("queue2","exchange3","#. Wujing "); /** ** Consumers consume messages * @params1: which queue to consume * @params2: Whether to automatically reply after the consumption succeeds true: automatic reply, flase: manual reply. * @ params3: consumers' success callback * @ params4: consumer spending failed callback. * / channel basicConsume (" queue2 ", true, deliverCallback, cancelCallback); }}Copy the code

Third, summary

This article focuses on RabbitMQ’s queue structure and the use of its core routing patterns.

The integration of Rabbbitmq with the SpringBoot project will be described in the next section.