Application scenarios of message-oriented middleware

Asynchronous processing

Scenario: After user registration information is written into the database, a successful email is sent to the user and then a successful email is sent.

1. Synchronous invocation: After the registration is successful, the system sends emails and SMS messages in sequence, and responds to the user

2. Parallel call: After successful registration, email and SMS will be executed concurrently in a multi-threaded way, and finally respond to the user

3. Message queue: After successful registration, the message to be sent is written into the message queue in a short time, and then responds to the user. The service that sends the mail and the service that sends the SMS can be read asynchronously from the message queue and then send the task.

The application of decoupling

Scenario: After placing a shopping order, call the inventory system to update the inventory.

1. Coupling method: order system, write and call the logic of inventory system.

2. Decoupling mode: The order system writes the message to the message queue, and the inventory system reads the message from the message queue to update the inventory.

Traffic peak clipping

In the second kill scenario, we can set up a fixed length message queue, the second kill starts, who is the fastest to enter the queue first, and then quickly return to the user whether the second is arrived, after the smooth processing of the second kill after the business.

Overview of message service middleware

    1. In most applications, message service middleware can be used to improve the asynchronous communication and extend the decoupling capability of the system
    1. When a message is sent by a sender, it is taken over by a message broker. The message broker ensures that the message is delivered to a specified destination.
    1. Message queues have two main forms of destination
    1. Queue: Point-to-point message communication
    1. Topic: Publish/subscribe message communication
    1. Point-to-point:
    1. The message sender sends the message, the message broker places it in a queue, the message receiver retrieves the message content from the queue, and the message is read and removed from the queue
    1. Messages have only one sender and receiver, but it is not true that there can only be one receiver
    1. Publish and subscribe:
  • A sender (publisher) sends a message to a topic, multiple receivers (subscribers) listen to (subscribe to) the topic, and receive the message simultaneously when it arrives
    1. JMS Java Message Service (JMS) :
  • Specification based on JVM message broker. ActiveMQ and HornetMQ are JMS implementations
    1. AMQP(Advanced Message Queuing Protocol)
  • Advanced Message Queue protocol, also a message broker specification, compatible with JMS
  • RabbitMQ is an implementation of AMQP

    1. Spring supports
  • Spring-jms provides support for JMS
  • Spring-rabbit provides support for AMQP
  • An implementation of ConnectionFactory is required to connect to the message broker
  • Provide JmsTemplate, RabbitTemplate to send messages
  • The @jMSListener (JMS), @RabbitListener(AMQP) annotation listens for messages sent by the message broker on the method
  • @enableJMS, @enablerabbit Enable support
    1. Spring Boot is automatically configured
  • JmsAutoConfiguration
  • RabbitAutoConfiguration

3. Introduction to RabbitMQ

RabbitMQ is an open source implementation of the Advanved Message Queue Protocol (AMQP) developed by Erlang.

1. Core concepts

  • “Message” : a Message is not named. It consists of a header and a body. The body of the message is opaque, while the header is composed of a set of optional attributes, including routing-key, priority(priority over other messages), delivery-mode(indicating that the message may require persistent storage), and so on.
  • Publisher: Producer of messages and a client application that publishes messages to the exchange.
  • Exchange: An Exchange that receives messages sent by producers and routes them to queues in the server. There are four types of Exchange: Direct (default), Fanout, Topic, and headers. Different types of Exchange have different policies for forwarding messages
  • Queue: Message Queue, used to hold messages until they are sent to consumers. It is the container and destination of the message. A message can be put into one or more queues. The message remains in the queue, waiting for the consumer to connect to the queue to pick it up.
  • Binding: Binding for the association between message queues and exchanges. A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be thought of as a routing table made up of bindings. Exchange and Queue bindings can be many-to-many.
  • Connection: a network Connection, such as a TCP Connection.
  • Channel: a separate two-way data Channel in a multiplexing connection. A channel is a dummy connection established within a real TCP connection. AMQP commands are sent through a channel. No matter publishing messages, subscribing to queues or receiving messages, these actions are completed through a channel. Because it is very expensive for an operating system to establish and destroy TCP, the concept of a channel was introduced to reuse a TCP connection.
  • Consumer: The Consumer of a message, representing a client application that retrieves a message from a message queue.
  • Virtual Host: a Virtual Host that represents a batch of switches, message queues, and related objects. Virtual hosts are separate server domains that share the same authentication and encryption environment. Each Vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /.
  • Broker: represents the message queue server entity.

4. How RabbitMQ works

Message routing in AMQP

The routing of messages in AMQP differs a bit from the JMS process familiar to Java developers, with the addition of Exchange and Binding roles. The producer publishes the message to the Exchange, the message eventually reaches the queue and is received by the consumer, and the Binding determines which queue the Exchange’s message should be sent to.

Exchange type

Exchange distributes messages according to different distribution policies. Currently, there are four types: Direct, FANout, Topic, and headers. Headers matches the header of an AMQP message rather than a routing key. The Headers exchange is exactly the same as the Direct exchange, but performs much worse than the direct exchange.

Install RabbitMQ

We use Docker to install RabbitMQ. We choose the latest version of the official Docker Hub with management interface.

#Obtain the RabbitMQ image
docker pull rabbitmq:3-management
#Start the RabbitMQ image. 5672 is the MQ communication port and 15672 is the WEB management interface port of MQRun -d -p 5672:5672 -p 15672:15672 --name MyrabbitMQ mirror IDCopy the code

To access 127.0.0.1:15672, log in to 127.0.0.1:15672 using account: guest and password: guest. The following interface is displayed:

The detailed use of RabbitMQ will not be covered here, but the focus of this section is to integrate RabbitMQ.

Integrate RabbitMQ

Create a project to import rabbitMQ dependencies.

1. pom.xml

<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.gf</groupId>
	<artifactId>springboot-rabbitmq</artifactId>
	<version>0.0.1 - the SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>springboot-rabbitmq</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>At 2.0.5. RELEASE</version>
		<relativePath/> <! -- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Copy the code

2. MyAMQPConfig

package com.gf.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;

/** * custom message converter, default is JDK serialization converter, we custom json */
@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter(a) {
        return newJackson2JsonMessageConverter(); }}Copy the code

3. Springboot test class

We tested creating administrative configurations, sending messages, and receiving messages

package com.gf;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {

	@Autowired
	RabbitTemplate rabbitTemplate;

	@Autowired
	AmqpAdmin amqpAdmin;

	@Test
	public void contextLoads(a) {}@Test
	public void create(a){
		/ / create Exchange
		amqpAdmin.declareExchange( new DirectExchange( "exchange.direct")); amqpAdmin.declareExchange(new FanoutExchange( "exchange.fanout")); amqpAdmin.declareExchange(new TopicExchange( "exchange.topic"));/ / create the Queue
		amqpAdmin.declareQueue( new Queue( "direct.queue" , true)); amqpAdmin.declareQueue(new Queue( "fanout.queue" , true));/ / bind the Queue
		amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.direct" , "direct.queue" , null)); amqpAdmin.declareBinding(new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.direct" , "fanout.queue" , null)); amqpAdmin.declareBinding(new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.fanout" , "" , null)); amqpAdmin.declareBinding(new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.fanout" , "" , null)); amqpAdmin.declareBinding(new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.topic" , "direct.#" , null)); amqpAdmin.declareBinding(new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.topic" , "direct.*" , null)); }@Test
	public void send2Direct(a) {
		Map<String , Object> map = new HashMap<>();
		map.put( "msg" , "This is a peer-to-peer message." );
		map.put( "data" , Arrays.asList("helloworld" , 123 , true)); rabbitTemplate.convertAndSend("exchange.direct" , "direct.queue" , map );

	}

	@Test
	public void send2Topic(a) {
		Map<String , Object> map = new HashMap<>();
		map.put( "msg" , "This is a broadcast message" );
		map.put( "data" , Arrays.asList("Topic news" , 123 , true)); rabbitTemplate.convertAndSend("exchange.fanout" , "", map );

	}

	@Test
	public void receive(a) {
		Object o = rabbitTemplate.receiveAndConvert( "direct.queue"); o.getClass(); System.out.println(o.getClass()); System.out.println(o); }}Copy the code

Listen to the message

4. Start the class

package com.gf;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/** * Automatic configuration * 1. RabbitAutoConfiguration * 2. 3. RabbitProperties encapsulates the RabbitMQ configuration * 4. RabbitTemplate: sends and receives messages to RabbitMQ * 5. RabbitMQ System management components * 6.@EnableRabbit + @RabbitListener* /
@EnableRabbit
@SpringBootApplication
public class SpringbootRabbitmqApplication {

	public static void main(String[] args) { SpringApplication.run(SpringbootRabbitmqApplication.class, args); }}Copy the code

5. MQService

package com.gf.service;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQService {

    @RabbitListener(queues = "fanout.queue")
    public void receive(Message message) {
        System.out.println("Received a message:" + newString(message.getBody())); }}Copy the code

Download the source code: github.com/gf-huanchup…