Getting started with MQ — Use of RabbitMQ

This is the second day of my participation in Gwen Challenge

This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details

The message queue

A Message Queue (MQ for short) is a container for storing messages. It is essentially a Queue.

Usage scenarios

The classic usage scenarios for message queues are asynchrony, decoupling, and peak peaking

asynchronous

For example, in a payment scenario, users also need to deduct coupons, increase or decrease points, and receive SMS notification when making payment. If the whole process is synchronous, the payment process will take a lot of time. In fact, in the whole payment process, as long as the user has paid, the interface should normally return the data, instead of waiting for deduction of coupons, increase or decrease of points and other operations

The decoupling

In the same payment scenario above, the user pays and sends messages via MQ, other SMS notifications, increments, subsets, etc. The system only listens to MQ without coupling the code to the payment system

Peak clipping

For example, when the volume of requests is very high and the server can’t handle that many requests, it puts them in MQ and the server consumes them by capacity.

Relevant concepts

Broker

RabbitMQ Server, Server entity.

Vhost

Virtual hosts. Multiple vhosts can be set up within a broker to separate users from each other. By default, a vhost named “/” is available when connected to RabbitMQ

Exchange

Message queue switch. A message is routed to a queue according to certain rules.

Direct Exchange

Directly connected to a switch that needs to be bound to a queue requiring that the message exactly matches a specific Routing key. The simplest way to say it is one-to-one, point-to-point.

In the provider

@Configuration
public class DirectRabbitConfig {

    @Bean
    public Queue rabbitmqDemoDirectQueue(a) {
        /** * 1, name: queue name * 2, durable: Persistent * 3, exclusive: exclusive If set to true, it is defined as an exclusive queue. Only the creator can use this queue. That means private. * 4. AutoDelete: Indicates whether to automatically delete the file. Temporary queues. When the last consumer disconnects, it is automatically deleted. * * /
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true.false.false);
    }

    @Bean
    public DirectExchange rabbitmqDemoDirectExchange(a) {
        / / Direct switches
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true.false);
    }

    @Bean
    public Binding bindDirect(a) {
        // Bind the switch to the queue and set the matching key
        return BindingBuilder
                // bind the queue
                .bind(rabbitmqDemoDirectQueue())
                // To switch
                .to(rabbitmqDemoDirectExchange())
                // Set a specific routing key.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING); }}Copy the code
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    // Format the date
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            Map<String, Object> map = getMessage(msg);
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-"."").substring(0.32);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        returnmap; }}Copy the code
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;

    @PostMapping("/sendMsg")
    public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
        returnrabbitMQService.sendMsg(msg); }}Copy the code

In the consumer

@Component
// The name of the queue to listen on
@RabbitListener(queues = {RabbitMQConfig.RABBITMQ_DEMO_TOPIC})
public class RabbitMQConsumer {
    @RabbitHandler
    public void process(Map map) {
        System.out.println("The queue [" + RabbitMQConfig.RABBITMQ_DEMO_TOPIC + "] Received a message:"+ map.toString()); }}Copy the code

Fanout exchange

Fan switch, which binds queues to the switch. A message sent to a switch is forwarded to all queues bound to the switch. Much like subnet broadcasting, each host in a subnet gets a copy of the message. Simply put, publish and subscribe.

In the provider

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue fanoutExchangeQueueA(a) {
        / / queue A
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true.false.false);
    }

    @Bean
    public Queue fanoutExchangeQueueB(a) {
        B / / queue
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true.false.false);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange(a) {
        // Create a FanoutExchange switch
        return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true.false);
    }

    @Bean
    public Binding bindFanoutA(a) {
        // Queue A is bound to the FanoutExchange switch
        return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
    }

    @Bean
    public Binding bindFanoutB(a) {
        // Queue B is bound to a FanoutExchange switch. The routing key does not need to be configured and does not take effect
        returnBindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange()); }}Copy the code
    /** * Fan switch sends messages *@paramMSG message inside the same as *@returnWhether the message is successfully sent *@throws Exception e
     */
    @PostMapping("/publish")
    public String publish(@RequestParam(name = "msg") String msg) throws Exception {
        return rabbitMQService.sendMsgByFanoutExchange(msg);
    }
Copy the code
    @Override
    public String sendMsgByFanoutExchange(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}Copy the code

Topic Exchange

A direct translation is called a topic switch, but a translation from usage might be called a wildcard switch. The switch uses wildcards to match and routes to the corresponding queue. There are two types of wildcards: * and #. Note that the wildcard must be preceded by the “.” symbol.

* Symbol: has and matches only one word. For example, a.* can match “A.B” and “A.C”, but cannot match “A.B.C”.

# symbol: Matches one or more words. For example, “rabbit.#” can match either “rabbit.a.b”, “rabbit.a”, or “rabbit.a.B.c”.

In the provider

@Configuration
public class TopicRabbitConfig {
    @Bean
    public Queue topicExchangeQueueA(a) {
        // Create queue 1
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true.false.false);
    }

    @Bean
    public Queue topicExchangeQueueB(a) {
        // Create queue 2
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true.false.false);
    }

    @Bean
    public Queue topicExchangeQueueC(a) {
        // Create queue 3
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true.false.false);
    }

    @Bean
    public TopicExchange rabbitmqDemoTopicExchange(a) {
        // Configure a TopicExchange switch
        return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true.false);
    }

    @Bean
    public Binding bindTopicA(a) {
        // Queue A is bound to the TopicExchange switch with A routing key of Rabbit.#, so messages carrying routing keys starting with Rabbit will be matched to queue A
        return BindingBuilder.bind(topicExchangeQueueA())
                .to(rabbitmqDemoTopicExchange())
                .with("rabbit#");
    }

    @Bean
    public Binding bindTopicB(a) {
        // Queue B is bound to the TopicExchange switch. The routing key is A.*
        return BindingBuilder.bind(topicExchangeQueueB())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicC(a) {
        // Queue C is bound to the TopicExchange switch
        return BindingBuilder.bind(topicExchangeQueueC())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*"); }}Copy the code
 /** * The topic switch sends messages *@param msg     * @param routingKey     * @return     * @throws Exception     */    @PostMapping("/topicSend")    public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception {        return rabbitMQService.sendMsgByTopicExchange(msg, routingKey);    }
Copy the code
 @Override    public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {        Map<String, Object> message = getMessage(msg);        try {            / / send a message rabbitTemplate. ConvertAndSend (RabbitMQConfig TOPIC_EXCHANGE_DEMO_NAME, routingKey, message); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; }}
Copy the code

Headers Exchange

This type of switch is not used as much. It differs a little from the above three in that instead of routing a match using a routingKey, it routes the route using the key value carried in the match request header

In the provider

@Configurationpublic class HeadersRabbitConfig {    @Bean    public Queue headersQueueA(a) {        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true.false.false);    }    @Bean    public Queue headersQueueB(a) {        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true.false.false);    }    @Bean    public HeadersExchange rabbitmqDemoHeadersExchange(a) {        return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true.false);    }    @Bean    public Binding bindHeadersA(a) {        Map<String, Object> map = new HashMap<>();        map.put("key_one"."java");        map.put("key_two"."rabbit");        {"key_one":" Java ","key_two": "Rabbit"} will match matching queue A return BindingBuilder. Bind (headersQueueA ()), to (rabbitmqDemoHeadersExchange ()) .whereAll(map).match(); } @Bean public Binding bindHeadersB() { Map
      
        map = new HashMap<>(); map.put("headers_A", "coke"); map.put("headers_B", "sky"); / / partial matches return BindingBuilder. Bind (headersQueueB ()), to (rabbitmqDemoHeadersExchange ()). WhereAny (map). The match (); }}
      ,>
Copy the code
@Override    public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {        try {            MessageProperties messageProperties = new MessageProperties();            / / message persistence messageProperties. SetDeliveryMode (MessageDeliveryMode. PERSISTENT); messageProperties.setContentType("UTF-8"); / / add message messageProperties. GetHeaders () putAll (map); Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; }}
Copy the code
 @PostMapping("/headersSend")    public String headersSend(@RequestParam(name = "msg") String msg,                              @RequestParam(name = "json") String json) throws Exception {        ObjectMapper mapper = new ObjectMapper();        Map<String, Object> map = mapper.readValue(json, Map.class);        return rabbitMQService.sendMsgByHeadersExchange(msg, map);    }
Copy the code

In the consumer

@Componentpublic class HeadersExchangeConsumer {    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A))    public void processA(Message message) throws Exception {        MessageProperties messageProperties = message.getMessageProperties();        String contentType = messageProperties.getContentType();        System.out.println("The queue [" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "] Received a message:" + new String(message.getBody(), contentType));    }    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))    public void processB(Message message) throws Exception {        MessageProperties messageProperties = message.getMessageProperties();        String contentType = messageProperties.getContentType();        System.out.println("The queue [" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "] Received a message:" + new String(message.getBody(), contentType));    }}
Copy the code

Routing key

Bind key that specifies which queue to receive the current message

Binding

A RabbitMQ queue is bound to an exchange using a routing key

The source code

The above source code address github.com/Saul-Zhang/…

reference

www.zhihu.com/question/54…

Blog.csdn.net/qq_35387940…