Introduction to JMS and AMQP


1. The role of messaging middleware

In most applications, asynchronous communication and application decoupling capabilities (asynchronous processing, application decoupling, traffic peak cutting) can be improved through message service middleware

2. Two important concepts in messaging services

  • Message Broker
  • Destination (destination)

When the message sender sends the message, it is taken over by the message broker, which ensures that the message is delivered to the specified destination.

3. Two forms of destination in the message queue

  • Queue: Point-to-point message communication
  • Topic: Publish/subscribe message communication

4. Point-to-point

  • The message sender sends the message, the message broker places it in a queue, the message receiver retrieves the message content from the queue, and the message is read and removed from the queue
  • Messages have only one sender and receiver, but it is not true that there can only be one receiver

5. Publish and subscribe

A sender (publisher) sends a message to a topic, multiple receivers (subscribers) listen to (subscribe to) the topic, and receive the message simultaneously when it arrives

Java Message Service (JMS)

Based on the specification of JVM message broker, ActiveMQ and HornetMQ are JMS implementations

7, AMQP(Advanced Message Queuing Protocol)

  • Advanced Message Queue protocol, also a message broker specification, compatible with JMS
  • RabbitMQ is an implementation of AMQP

Introduction to RabbitMQ


1. RabbitMQ:

RabbitMQ is an open source implementation of the Advanved Message Queue Protocol (AMQP) developed by Erlang.

2. RabbitMQ core concepts

1) Message

A message is anonymous and consists of a header and a body. The body of the message is opaque, and the header consists of a set of optional attributes, including routing-key, priority(priority over other messages), delivery-mode(indicating that the message may require persistent storage), and so on

2) Publisher (Producer)

The producer of the message is a client application that publishes the message to the exchange.

3) Exchange

Switches are used to receive messages sent by producers and route them to queues in the server.

There are four types of Exchange: Direct (default), Fanout, Topic, and headers. Different types of Exchange have different policies for forwarding messages

4) Queue Message Queue

It is used to hold messages until they are sent to the consumer, and it is the container and destination of the message. A message can be put into one or more queues. The message remains in the queue, waiting for the consumer to connect to the queue to pick it up

5) Binding

Used for association between message queues and exchanges. A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be thought of as a routing table made up of bindings.

Exchange and Queue bindings can be many-to-many.

Network Connection

Like a TCP connection

7) Channel

An independent two-way data channel in a multiplexing connection. A channel is a virtual connection established in a real TCP connection. AMQP commands are sent through the channel. No matter publishing messages, subscribing to queues or receiving messages, these actions are completed through the channel. Because it is very expensive for an operating system to establish and destroy TCP, the concept of a channel was introduced to reuse a TCP connection

8) Subscriber message subscriber (or Consumer message Consumer)

Represents a client application that retrieves a message from a message queue.

9)Virtual Host

Represents a batch of exchanges, message queues, and related objects. A virtual host is a separate server domain that shares the same authentication and encryption environment. Each Vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /

10)Message Broker

Represents the message queue server entity

3. Interaction principle of RabbitMQ

  • A message producer or publisher sends a message to a Virtual Host within a message broker or server
  • Virtual hosts have a lot of exchanges and queues. Messages are sent to the virtual host, and the virtual host sends messages to the specified switch
  • The switch determines which message queue to route the message to according to the routing key of the message. The routing rule is expressed by binding
  • When the message arrives in the message queue, the consumer can retrieve the message from the message queue
  • The consumer establishes a connection with the message queue, and in order to save resource multiplexing, there are many channels in each TCP connection through which information is exchanged

3. How RabbitMQ works


1. Message routing in AMQP

  • Exchange and Binding roles have been added to AMQP.
  • The producer publishes the message to the Exchange, the message eventually reaches the queue and is received by the consumer, and the Binding determines which queue the Exchange’s message should be sent to.
  • Producers are message, sent to the broker (message broker, server), server receives news will give the message to the switch, switch is bound to the message queue, there are so many in the server switches and message queue, the routing based on message carried over the switches keys to determine which messages delivered to queue, After the switch puts the message to the queue according to the routing rule (Binding), the consumer can connect to the queue to retrieve the corresponding message

2, Exchange type

Exchange distributes messages according to different distribution policies. Currently, there are four types: Direct, FANout, Topic, and headers. Headers matches the header of an AMQP message rather than a routing key. The Headers exchange is exactly the same as the Direct exchange, but performs much worse and is now almost useless

1) Direct Exchange

If the routing key in the message matches the Binding key in Binding, the exchange sends the message to the corresponding queue. It is a perfectly matched, unicast pattern

2) Fanout Exchange

Each message sent to a FANout type exchange is sent to all bound queues. The FANout exchange does not handle routing keys, but simply binds queues to the exchange, and every message sent to the exchange is forwarded to all queues bound to the exchange. Fanout type forwarding messages is the fastest.

3) Topic Exchange

Topic exchange through the pattern matching allocation of message routing key attributes, and the key to the routing a pattern matching queue need to bind to a model at this time, it will be the routing and binding keys string into words, with a point between these words, it will also identify two wildcards (such as #, *), # match zero or more words, * matches a word

Install RabbitMQ


Docker install rabbitMQ

docker search rabbitmq
docker pull rabbitmq:management
docker run -d --name="myrabbitmq" -p 5672:5672 -p 15672:15672 rabbitmq:management
Copy the code

Docker command

  • Run: Creates a new container and runs a command
  • -d: Runs the container in the background and returns the container ID
  • -p: indicates port mapping. The format is host (host) port: container port
  • –name=” rabbitMQ “: Specify a name for the container

RabbitMQ creates a guest user by default, and the password is guest. If you cannot access RabbitMQ, check the firewall port or security group of the cloud server

The address of the management background is http://ip:port

5. Rabbit Quick Start


Package:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
Copy the code

1. Do not specify switch code examples

1) Message producers

public static void main(String[] args) throws Exception {
    // create a link factory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //todo 2
    connectionFactory.setHost("47.93.60.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("cyan");
    connectionFactory.setUsername("cyan");
    connectionFactory.setPassword("cyan");

    //todo 3. Create a connection object from the connection factory
    Connection connection = connectionFactory.newConnection();
    4. Create a channel through a connection
    Channel channel = connection.createChannel();

    //todo 5. Send messages via channel
    for(int i=0; i<5; i++) { String message ="hello--"+i;
        
    // The toDO message will be sent to the exchange, if we do not specify the specific switch when the message is sent,
    //todo then the message will be sent to the default switch specified by the RabBIMTQ, which will use the routing_key,
    // Todo looks up the corresponding queueName and sends it to the queue
        channel.basicPublish(""."cyan-queue-01".null,message.getBytes());
    }
    Todo 6. Close the connection
    channel.close();
    connection.close();
}
Copy the code

2) Message consumers

public static void main(String[] args) throws Exception {
    // create a link factory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //todo 2
    connectionFactory.setHost("47.93.60.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("cyan");
    connectionFactory.setUsername("cyan");
    connectionFactory.setPassword("cyan");

    //todo 3. Create a connection object from the connection factory
    Connection connection = connectionFactory.newConnection();
    4. Create a channel through a connection
    Channel channel = connection.createChannel();

    //todo 5
    String queueName = "cyan-queue-01";
    channel.queueDeclare(queueName,true.false.false.null);

    //todo 6. Receive messages through channels
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName,true,queueingConsumer);
    while (true) {
        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
        String reserveMsg = new String(delivery.getBody());
        System.out.println("Consumer News:+reserveMsg); }}Copy the code

3) Main API parameter description

/** * Todo Creates a switch * Todo Exchange: Switch name * Todo Type: Switch type * Todo durable: Whether to persist * Todo autoDelete: Whether to automatically delete a switch when the last consumer disconnects * Todo arguments: arguments */
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments);

/** * TODO Create queue * TOdo Queue: Queue name * TOdo durable: Persistent * TODO EXCLUSIVE: Whether the queue will be automatically deleted when the connection is closed AutoDelete: Whether the queue is automatically deleted after the last consumer disconnects * Todo Arguments: arguments */
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments);

/** * todo sends messages through channels * todo exchange: exchange name * todo routingKey: routingKey * todo props: message properties * todo body: message content */
channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);

/** * todo creates consumer * todo channel: channel */
QueueingConsumer queueingConsumer = new QueueingConsumer(Channel channel);

/** * Todo props:callback: consumer */ Todo props:callback: consumer */
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
Copy the code

2. Direct switch code examples

1) Message producers

public static void main(String[] args) throws IOException, TimeoutException { //... . Omitted create channel code // TODO Defines the switch name String exchangeName ="cyan.directchange"; // Todo defines routingKey String routingKey ="cyan.directchange.key"; //todo messageBody content String messageBody ="hello cyan ";
    channel.basicPublish(exchangeName,routingKey,null,messageBody.getBytes());
}
Copy the code

2) Message consumers

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //... . Omitted create channel code //todo declares exchangeName, type, queue name, bind key (routing key) String exchangeName ="cyan.directchange";
    String exchangeType = "direct";
    String queueName = "cyan.directqueue";
    String routingKey = "cyan.directchange.key";
    
    channel.exchangeDeclare(exchangeName,exchangeType,true.false,null);
    channel.queueDeclare(queueName,true.false.false,null); / / todo queue and switches binding channel. QueueBind (queueName, exchangeName routingKey); //todo creates a QueueingConsumer QueueingConsumer = new QueueingConsumer(channel); // Todo start consuming channel.basicConsume(queueName,true,queueingConsumer);

    while (true) {
        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
        String reciverMessage = new String(delivery.getBody());
        System.out.println("Consumer News :-----"+reciverMessage); }}Copy the code

3. Fan switch

1) Message producers

public static void main(String[] args) throws IOException, TimeoutException { //... . Omitted create channel code // TODO Defines the switch name String exchangeName ="cyan.fanoutexchange"; // Todo defines routingKey String routingKey1 ="123";
    String routingKey2 = "456";
    String routingKey3 = "789"; / / todo the body of the message content channel. BasicPublish (exchangeName routingKey1, null,"I am the first message.".getBytes());
    channel.basicPublish(exchangeName,routingKey2,null,"I am the second message.".getBytes());
    channel.basicPublish(exchangeName,routingKey3,null,"I'm the third message.".getBytes());
}
Copy the code

2) Message consumers

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //... . Omitted create channel code //todo declares exchangeName, type, queue name, bind key (routing key) String exchangeName ="cyan.fanoutexchange";
    String exchangeType = "fanout";
    String quequName = "cyan.fanoutqueue";
    String routingKey = "cyan";
    
    channel.exchangeDeclare(exchangeName,exchangeType,true.true,null);
    channel.queueDeclare(quequName,true.false.false,null);
    channel.queueBind(quequName,exchangeName,routingKey);

    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(quequName,true,queueingConsumer);
    while (true) {
        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
        System.out.println("Received the message :"+new String(delivery.getBody())); }}Copy the code

4. Topic exchanger

1) Message producers

public static void main(String[] args) throws IOException, TimeoutException { //... . Omit the code for creating a channel String exchangeName ="cyan.topicexchange";
    String routingKey1 = "cyan.key1";
    String routingKey2 = "cyan.key2";
    String routingKey3 = "cyan.key.key3";

    channel.basicPublish(exchangeName,routingKey1,null,"I am the first message.".getBytes());
    channel.basicPublish(exchangeName,routingKey2,null,"I am the second message.".getBytes());
    channel.basicPublish(exchangeName,routingKey3,null,"I'm the third message.".getBytes());
}
Copy the code

2) Message consumers

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //... . Omitted create channel code //todo declares exchangeName, type, queue name, bind key (routing key) String exchangeName ="cyan.topicexchange";
    String exchangeType = "topic";
    String quequName = "cyan.topicqueue";
    String routingKey = "cyan.*";
    
    channel.exchangeDeclare(exchangeName,exchangeType,true.true,null);
    channel.queueDeclare(quequName,true.false.false,null); // Todo declares the binding * to match a word# Match multiple words
    channel.queueBind(quequName,exchangeName,routingKey);

    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(quequName,true,queueingConsumer);
    while (true) {
        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
        System.out.println("Received the message :"+new String(delivery.getBody())); }}Copy the code

5. Message properties

1) Message producers

public static void main(String[] args) throws IOException, TimeoutException { //... . Map<String,Object> headsMap = new HashMap<>(); headsMap.put("company"."cyan");
    headsMap.put("name"."Green"); BasicProperties BasicProperties = new amqp.basicProperties ().builder().deliveryMode(2)//2 .expiration("10000")// Message expiration time 10s.contentEncoding ("utf-8") .correlationId(UUID.randomUUID().toString()) .headers(headsMap) .build(); // Todo sends messages through a channelfor(int i=0; i<5; i++) { String message ="hello--"+i;
        channel.basicPublish("cyan.directchange"."cyan.directchange.key",basicProperties,message.getBytes()); }}Copy the code

2) Message consumers

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //... . Omitted create channel code //todo declares exchangeName, type, queue name, bind key (routing key) String exchangeName ="cyan.directchange";
    String exchangeType = "direct";
    String queueName = "cyan.directqueue";
    String routingKey = "cyan.directchange.key";
    
    channel.exchangeDeclare(exchangeName,exchangeType,true.false,null);
    channel.queueDeclare(queueName,true.false.false,null); channel.queueBind(queueName,exchangeName,routingKey); //todo creates a QueueingConsumer QueueingConsumer = new QueueingConsumer(channel); // Todo start consuming channel.basicConsume(queueName,true,queueingConsumer);
    while (true) {
        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
        String reserveMsg = new String(delivery.getBody());
        System.out.println("Consumer News :-----"+reserveMsg); // Todo reads message attributes system.out.println ("encoding:"+delivery.getProperties().getContentEncoding());
        System.out.println("company:"+delivery.getProperties().getHeaders().get("company"));
        System.out.println("name:"+delivery.getProperties().getHeaders().get("name"));
        System.out.println("correlationId:"+delivery.getProperties().getCorrelationId()); }}Copy the code

5, Spring Boot RabbitMQ integration


1. Automatic configuration

1) Automatic configuration classes

public class RabbitAutoConfiguration {}
Copy the code

2) The connection factory is automatically configured

public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {}
Copy the code

3)RabbitProperties encapsulates the RabbitMQ configuration

Spring. The rabbitmq. Host = 106.13.125.89 spring. The rabbitmq. Port = 5672 spring. The rabbitmq. Username = guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=Copy the code

4)RabbitTemplate sends and receives messages to RabbitMQ

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {}
Copy the code

5)AmqpAdmin is a RabbitMQ system management component (Queue, Exchange, Binding).

public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
}
Copy the code

6) @enablerabbit and @RabbitListener listen to the contents of the message queue

2. Customize message serialization

@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter() {returnnew Jackson2JsonMessageConverter(); }}Copy the code

Define the entity class that sends the message

public class Book implements Serializable { private String bookName; private String author; . }Copy the code

4. Send and receive messages

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {// Create a Message of your own, // RabbitTemplate. send(String exchange,String routingKey,Message Message) // Object is used as the Message body by default. Automatic serialization to send to the rabbitmq / / rabbitTemplate convertAndSend (String exchange, String routingKey, Object Object) / / Object has been sent out after the default serialization rabbitTemplate.convertAndSend("exchange.direct"."qingzi",new Book(Journey to the West."Cheng 'en Wu"));
    }

    @Test
    public void receive(){
        Object object = rabbitTemplate.receiveAndConvert("qingzi");
        if(object ! = null) { Book book = (Book) object; } } @Test public voidsendMsg(){
        rabbitTemplate.convertAndSend("exchange.fanout"."",new Book(Romance of The Three Kingdoms."Luo Guanzhong")); }}Copy the code

5. Monitoring mechanism

@SpringBootApplication @EnableRabbit public class RabbitmqApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqApplication.class, args); }}Copy the code
@Service
public class BookService {
    @RabbitListener(queues = {"qingzi"})
    public void receive(Book book){
        System.out.println("Received a message"+book); }}Copy the code

6. Use of amqpAdmin management components

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue"));
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue",
                Binding.DestinationType.QUEUE,"amqpadmin.exchange"."amqpadmin.routingkey",null)); }}Copy the code