Mind mapping

What is a message queue

Messages refer to data that is passed between two applications. Data types can take many forms, ranging from just text strings to embedded objects.

A Message Queue is a container that holds messages during their transmission. In message queues, there are usually two roles: producer and consumer. The producer is only responsible for sending data to the message queue and does not care who takes it out of the message queue for processing. The consumer is only responsible for pulling the data from the message queue, he doesn’t care who sent the data.

Why message queues

There are three main functions:

  • Decoupling. As shown in the figure. Suppose that systems B, C, and D all need data from system A, so system A calls three methods to send data to B, C, and D. Now, system D doesn’t need it anymore, so you need to delete the relevant code in system A. Let’s say there’s A new system E that needs data, and system A has to add the code that calls system E. To reduce this strong coupling, MQ can be used, where system A simply sends data to MQ and other systems get it from MQ if they need it.

  • Asynchronous. As shown in the figure. When A client sends A request, system A invokes system B, C, and D. If the request is synchronized, the response time is the sum of system A, B, C, and D, that is, 800ms. If MQ is used, system A sends data to MQ and can then send A response back to the client without having to wait for A response from systems B, C, or D, which can greatly improve performance. MQ can be used for non-essential services such as sending SMS messages, sending emails, and so on.

  • Peak clipping. As shown in the figure. This is actually an important application of MQ. If 5000 requests are sent to system A during A period of time, system A will send 5000 SQL requests to MySQL for execution. MySQL cannot process such A large number of requests, and MySQL will crash, resulting in system breakdown. If MQ is used, instead of sending SQL directly to the database, system A sends data to MQ, where it is acceptable to backlog data for short periods and then process it by pulling 2000 at A time by the consumer, preventing the system from crashing due to the large number of requests sent directly to MySQL during peak request periods.

Three, RabbitMQ features

RabbitMQ is an open source message middleware that implements AMQP(Advanced Message Queuing Protocol) in Erlang. First of all, know some of RabbitMQ’s features:

  • Reliability. Support for persistence, transport confirmation, release confirmation and more ensures the reliability of MQ.
  • Flexible message distribution strategy. This should be a feature of RabbitMQ. Messages are routed by Exchanges (switches) before they enter MQ. The message distribution strategies include simple mode, work queue mode, publish and subscribe mode, routing mode, and wildcard mode.
  • Cluster support. Multiple RabbitMQ servers can form a cluster to form a logical Broker.
  • Multiple protocols. RabbitMQ supports a variety of message queuing protocols, such as STOMP, MQTT, and more.
  • Supports multiple language clients. RabbitMQ supports almost all common programming languages, including Java,.NET, Ruby, and more.
  • Visual management interface. RabbitMQ provides an easy-to-use user interface that allows users to monitor and manage message brokers.
  • Plug-in mechanism. RabbitMQ comes with a number of plug-ins, which you can extend or write your own.

4. RabbitMQ Beginner experience

4.1 Installing RabbitMQ in Windows 10

Because it is only learning needs, so installed in the Win10 system, it is too lazy to open the virtual machine. For Linux, I recommend using Docker to pull a RabbitMQ image to make it easier.

4.1.1 Installing erLang and configuring environment variables

First go to Erlang official website to download the win10 version of the installation package.

Once you’ve downloaded it, you get this:

Then double-click install, keep clicking Next, and after installing, configure the environment variables.

Run CMD and enter erl -version to verify:

4.1.2 Installing the RabbitMQ Server

Download the Windows server installation package from the gitHub project for RabbitMQ.

When you download it, you get this:

Then double-click Install and keep clicking Next Install. After the installation is complete, locate the installation directory:

To install rabbitmq_management plugins, run the rabbitmq-plugins enable rabbitmq_management command.

Then double-click the rabbitmq-server.bat script to start the service management script and you can see rabbitMQ running:

In this case, open the browser and enter http://localhost:15672. The default password is guest/guest

At this point, the installation is complete!

4.2 Forever Hello Word

After the server is built, we must use the client to operate. Next, we will use Java to do a simple HelloWord demonstration.

Since I’m using SpringBoot, I can add the corresponding starter dependency on the producer side:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

You typically create a common project that shares configuration such as queue topic, switch name, routing match key name, and so on.

First add the RabbitMQ configuration to the application.yml file:

spring:
    rabbitmq:
        host: 127.0. 01.
        port: 5672
        username: guest
        password: guest
Copy the code

Add the Maven dependency to the common package on the producer side, and create a Direct switch and the queue configuration class:

@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue rabbitmqDemoDirectQueue(a) {
        /** * 1, name: queue name * 2, durable: Persistent * 3, exclusive: exclusive If set to true, it is defined as an exclusive queue. Only the creator can use this queue. That means private. * 4. AutoDelete: Indicates whether to automatically delete the file. Temporary queues. When the last consumer disconnects, it is automatically deleted. * * /
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true.false.false);
    }
    
    @Bean
    public DirectExchange rabbitmqDemoDirectExchange(a) {
        / / Direct switches
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true.false);
    }

    @Bean
    public Binding bindDirect(a) {
        // Bind the switch to the queue and set the matching key
        return BindingBuilder
                // bind the queue
                .bind(rabbitmqDemoDirectQueue())
                // To switch
                .to(rabbitmqDemoDirectExchange())
                // And set the matching key.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING); }}Copy the code

Then create a Service class that sends the message:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    // Format the date
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            String msgId = UUID.randomUUID().toString().replace("-"."").substring(0.32);
            String sendTime = sdf.format(new Date());
            Map<String, Object> map = new HashMap<>();
            map.put("msgId", msgId);
            map.put("sendTime", sendTime);
            map.put("msg", msg);
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}}Copy the code

Then, depending on the business, put it where it needs to be used, such as scheduled tasks or interfaces. I’ll simply use the Controller layer to send:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    /** * Send message *@authorJava technology enthusiast */
    @PostMapping("/sendMsg")
    public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
        returnrabbitMQService.sendMsg(msg); }}Copy the code

When the producer is done, he writes the code for the consumer, which is easy. Maven depends on yML file configuration as producer. Just create a class with the @RabbitListener annotation with the name of the listening queue, as shown:

There is a small hole where the queue was not created in the RabbitMQ server at first:

If the consumer is started, an error will be reported:

To start the producer, send a message:

Finally, to start the consumer:

At this point, messages on the queue are constantly listened for, and whenever a producer sends a message to MQ, the consumer consumes one. Here I try to send 4:

Start this problem where the consumer reported an error because the queue did not exist. The best way to do this is for both the producer and the consumer to try to create a queue, and how do I write it? There are a lot of ways, but I’ll use a simpler one:

The producer configuration class adds something:

// Implement the BeanPostProcessor class, using Bean lifecycle functions
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    // This is the rabbitAdmin object for creating switches and queues
    @Resource
    private RabbitAdmin rabbitAdmin;
    
    // Initialize the rabbitAdmin object
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        Spring will load RabbitAdmin only if it is set to true
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    
    // After the bean is instantiated, it is the bean's post-processor
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // Create a switch
        rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
        // Create a queue
        rabbitAdmin.declareQueue(rabbitmqDemoDirectQueue());
        return null; }}Copy the code

This will automatically create switches and queues without waiting for messages to be sent.

The consumer needs to add a bit of code:

@Component
// Use the queuesToDeclare attribute, or create a queue if it does not exist
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
    / /... omit
}
Copy the code

In this way, no matter the producer or consumer starts first, there will be no problem

Code address: github.com/yehongzhi/m…

Components of RabbitMQ

From the HelloWord example above, we can probably get a taste of the composition of RabbitMQ, which has these parts:

  • Broker: Message queue service process. This process consists of two parts: Exchange and Queue.
  • Exchange: message queue switch. A message is routed to a queue according to certain rules.
  • Queue: Message Queue, which stores messages.
  • Producer: indicates message producers. The production client routes the message to the queue with the switch.
  • -Penny: Consumer. Consume messages stored in the queue.

How these components work together, the general process is as follows:

  • Message producers connect to the RabbitMQ Broker, create a connection, and enable a channel.
  • The producer declares the switch type, name, persistence, and so on.
  • The producer sends the message and specifies properties and routing keys such as whether the message is persistent.
  • The Exchange receives the message and routes it to the matching queue bound to the current switch based on the routing key.
  • The consumer listener receives the message and begins business processing.

There are four types of Exchange

As you can see from the workflow above, there is actually a key component, Exchange, because messages sent to RabbitMQ must first be routed through the Exchange Queue to find the corresponding Queue.

There are actually four types of Exchange, and they work differently depending on the type. In the HelloWord example, we use a relatively simple Direct Exchange, which translates to a Direct switch. The other three are Fanout Exchange, Topic Exchange, and Headers Exchange.

6.1 Direct Exchange

Directly connected to a switch means that the switch needs to bind a queue that requires the message to exactly match a particular routing key. The simplest way to say it is one-to-one, point-to-point.

The complete code is the HelloWord example above, not to repeat the code.

6.2 the Fanout exchange

This type of switch requires queues to be bound to the switch. A message sent to a switch is forwarded to all queues bound to the switch. Much like subnet broadcasting, each host in a subnet gets a copy of the message. Simply put, publish and subscribe.

How to write the code, to demonstrate:

First, configure the switch and queue names:

public class RabbitMQConfig {
    /** * The name of the FANOUT_EXCHANG switch type queue A */
    public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A";

    /** * The name of the FANOUT_EXCHANG switch type queue B for RabbitMQ */
    public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B";

    /** * The FANOUT_EXCHANG switch type name for RabbitMQ */
    public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name";

}
Copy the code

Configure A FanoutExchange switch and two queues, A and B, and bind them. This type does not require a routing key:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Resource
    private RabbitAdmin rabbitAdmin;
    
    @Bean
    public Queue fanoutExchangeQueueA(a) {
        / / queue A
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true.false.false);
    }

    @Bean
    public Queue fanoutExchangeQueueB(a) {
        B / / queue
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true.false.false);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange(a) {
        // Create a FanoutExchange switch
        return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true.false);
    }

    @Bean
    public Binding bindFanoutA(a) {
        // Queue A is bound to the FanoutExchange switch
        return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
    }

    @Bean
    public Binding bindFanoutB(a) {
        // Queue B is bound to the FanoutExchange switch
        return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // Start the project to create switches and queues
        rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
        rabbitAdmin.declareQueue(fanoutExchangeQueueB());
        rabbitAdmin.declareQueue(fanoutExchangeQueueA());
        return null; }}Copy the code

Create a service to publish a message:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    // Publish the message
    @Override
    public String sendMsgByFanoutExchange(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}// Assemble the message body
    private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-"."").substring(0.32);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        returnmap; }}Copy the code

The Controller interface:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    /** * Release message **@authorJava technology enthusiast */
    @PostMapping("/publish")
    public String publish(@RequestParam(name = "msg") String msg) throws Exception {
        returnrabbitMQService.sendMsgByFanoutExchange(msg); }}Copy the code

Next on the consumer side, create two queues of listener classes that listen to the queue for consumption:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("Queue A received A message:"+ map.toString()); }}Copy the code
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("Queue B received a message:"+ map.toString()); }}Copy the code

Then start the producer and consumer projects and you can see that the admin interface creates a FanoutExchange switch and two queues and binds them:

Using POSTMAN to send messages, test:

Then you can see from the console that both queues receive the same message at the same time, creating a publish-subscribe effect:

6.3 Topic Exchange

A direct translation is called a topic switch, but a translation from usage might be called a wildcard switch. The switch uses wildcards to match and routes to the corresponding queue. There are two types of wildcards: * and #. Note that the wildcard must be preceded by the “.” symbol.

* Symbol: has and matches only one word. For example, a.* can match “A.B” and “A.C”, but cannot match “A.B.C”.

# symbol: Matches one or more words. For example, “rabbit.#” can match either “rabbit.a.b”, “rabbit.a”, or “rabbit.a.B.c”.

Without further ado, code demo:

Again, configure the TopicExchange name and the names of the three queues:

    /** * TOPIC_EXCHANGE switch name for RabbitMQ */
    public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name";

    /** * Queue A of TOPIC_EXCHANGE for RabbitMQ */
    public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a";

    /** * Queue B of TOPIC_EXCHANGE for RabbitMQ */
    public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b";

    /** * The TOPIC_EXCHANGE switch queue C name */
    public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c";
Copy the code

Then, as usual, configure the switch and queue, and bind to create:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    / / to omit...
    
    @Bean
    public TopicExchange rabbitmqDemoTopicExchange(a) {
        // Configure a TopicExchange switch
        return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true.false);
    }

    @Bean
    public Queue topicExchangeQueueA(a) {
        // Create queue 1
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true.false.false);
    }

    @Bean
    public Queue topicExchangeQueueB(a) {
        // Create queue 2
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true.false.false);
    }

    @Bean
    public Queue topicExchangeQueueC(a) {
        // Create queue 3
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true.false.false);
    }

    @Bean
    public Binding bindTopicA(a) {
        // Queue A is bound to the FanoutExchange switch
        return BindingBuilder.bind(topicExchangeQueueB())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicB(a) {
        // Queue A is bound to the FanoutExchange switch
        return BindingBuilder.bind(topicExchangeQueueC())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicC(a) {
        // Queue A is bound to the FanoutExchange switch
        return BindingBuilder.bind(topicExchangeQueueA())
                .to(rabbitmqDemoTopicExchange())
                .with("rabbit.#");
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
        rabbitAdmin.declareQueue(topicExchangeQueueA());
        rabbitAdmin.declareQueue(topicExchangeQueueB());
        rabbitAdmin.declareQueue(topicExchangeQueueC());
        return null; }}Copy the code

Then write a service method that sends the message:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Override
    public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            // Send a message
            rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}}Copy the code

Write a Controller interface:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    /** * Wildcard the switch sends messages **@authorJava technology enthusiast */
    @PostMapping("/topicSend")
    public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception {
        returnrabbitMQService.sendMsgByTopicExchange(msg, routingKey); }}Copy the code

The producer side of the producer side, the consumer side is simpler, write three listener class:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A))
public class TopicExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("The queue [" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A + "] Received a message:"+ map.toString()); }}@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B))
public class TopicExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("The queue [" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B+ "] Received a message:"+ map.toString()); }}@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C))
public class TopicExchangeConsumerC {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("The queue [" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C + "] Received a message:"+ map.toString()); }}Copy the code

Once you’re done, start the project and start debugging. The binding relationship between queues and routing keys can be seen:

[POSTMAN] [POSTMAN] [POSTMAN] [POSTMAN]

Test successful, queue A consumes message:

Next test a.* routingKey, send routingKey = a.b:

More commonly used is the above three: direct connection (DirectExchange), publish subscription (FanoutExchange), wildcard (TopicExchange). Skilled use of these three switch types can basically solve most business scenarios.

In fact, if you think about it a little bit, you can see that the pattern of wildcard (TopicExchange) can achieve both direct connection (Direct change) and publish and subscribe (FanoutExchange).

FanoutExchange does not require binding to a routingKey, so it performs better than TopicExchange.

6.4 Headers Exchange

This type of switch is not used as much. It differs a little from the above three in that instead of routing a match using a routingKey, it routes the route using the key value carried in the match request header. As shown in the figure:

To create a queue, you need to set the header information for the binding. There are two modes: full match and partial match. As shown in the figure above, the switch matches the key bound to the queue according to the key carried in the header message sent by the producer and routes to the corresponding queue. To do this, look at the demo code below:

First, we need to define the switch name and queue name:

    /** * HEADERS_EXCHANGE Switch name */
    public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name";

    /** * HEADERS_EXCHANGE switch queue A name */
    public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a";

    /** * HEADERS_EXCHANGE Switch queue B name */
    public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b";
Copy the code

Then set up the switch, queue, and bind:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Bean
    public Queue headersQueueA(a) {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true.false.false);
    }

    @Bean
    public Queue headersQueueB(a) {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true.false.false);
    }

    @Bean
    public HeadersExchange rabbitmqDemoHeadersExchange(a) {
        return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true.false);
    }

    @Bean
    public Binding bindHeadersA(a) {
        Map<String, Object> map = new HashMap<>();
        map.put("key_one"."java");
        map.put("key_two"."rabbit");
        / / the whole match
        return BindingBuilder.bind(headersQueueA())
                .to(rabbitmqDemoHeadersExchange())
                .whereAll(map).match();
    }

    @Bean
    public Binding bindHeadersB(a) {
        Map<String, Object> map = new HashMap<>();
        map.put("headers_A"."coke");
        map.put("headers_B"."sky");
        // Partial match
        return BindingBuilder.bind(headersQueueB())
                .to(rabbitmqDemoHeadersExchange())
                .whereAny(map).match();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
        rabbitAdmin.declareQueue(headersQueueA());
        rabbitAdmin.declareQueue(headersQueueB());
        return null; }}Copy the code

Write another Service method to send the message:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Override
    public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {
        try {
            MessageProperties messageProperties = new MessageProperties();
            // Message persistence
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messageProperties.setContentType("UTF-8");
            // Add a message
            messageProperties.getHeaders().putAll(map);
            Message message = new Message(msg.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}}Copy the code

Write another Controller interface:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    @PostMapping("/headersSend")
    @SuppressWarnings("unchecked")
    public String headersSend(@RequestParam(name = "msg") String msg,
                              @RequestParam(name = "json") String json) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        Map<String, Object> map = mapper.readValue(json, Map.class);
        returnrabbitMQService.sendMsgByHeadersExchange(msg, map); }}Copy the code

When the producer is finished, it writes two queues of listener classes for consumption:

@Component
public class HeadersExchangeConsumerA {
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A))
    public void process(Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        System.out.println("The queue [" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "] Received a message:" + newString(message.getBody(), contentType)); }}@Component
public class HeadersExchangeConsumerB {
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
    public void process(Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        System.out.println("The queue [" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "] Received a message:" + newString(message.getBody(), contentType)); }}Copy the code

Start the project, open the management interface, we can see the switch binding queue information:

As shown in the diagram above, prove that there is no problem and everything is under control. Using POSTMAN to send, test fully matched queue A:

Retest partially matched queue B:

conclusion

So much for this article. Review what you learned:

  • What is a message queue? Why message queues?
  • RabbitMQ features, components, workflow
  • Install RabbitMQ and complete a HelloWord mini-case
  • The four types of RabbitMQ switches and how to use them

There are actually transactions and load balancing for RabbitMQ that I haven’t covered yet because it’s a bit too long, around 5,000 words. So I’ll talk about that in the next video, but look forward to that.

The code for all of the above examples is uploaded to Github:

Github.com/yehongzhi/m…

If you find this article useful, give it a thumbs up

Your praise is the biggest motivation for my creation ~

If you want to see my updated article for the first time, you can search the official account on wechat.Java technology enthusiast“,Refusing to be a salt fish, I’m a programmer trying to be remembered. See you next time!!

Ability is limited, if there is any mistake or improper place, please criticize and correct, study together!