RabbitMQ is an open source message broker and queue server for sharing data between completely different applications over a common protocol. RabbitMQ is written in the Erlang language and is based on the AMQP protocol.

The cause of RabbitMQ high performance

Use the Erlang language

The Erlang language was originally built as an architectural model for the switch domain, which makes RabbitMQ performance excellent for data interaction between brokers

Advantages of Erlang :Erlang has the same latency as native sockets

AMQP

Advanced Message Queuing Protocol

The definition of it

A binary protocol with modern features. Message queue protocol is an application layer standard that provides unified message service. It is an open standard of application layer protocol. It is designed for message-oriented middleware.

AMQP protocol model

AMQP core concepts

Server/BRoker: Accepts connections from clients to implement AMQP entity services

Commection: To connect, apply the network connection to the Broker

Channel: Network Channel in which almost all operations take place. A Channel is a Channel for reading and writing messages. A client can establish multiple channels, each representing a session task.

Message: A Message, data sent between the server and the application, consisting of Properties and Body. Properties allows you to modify messages with advanced characteristics, such as priority, latency, and so on. The Body is the content of the message Body

Virtual host: Virtual address used for logical isolation, the top layer of message routing. A Virtual host can contain multiple exchanges and queues. A Virtual host cannot contain exchanges or queues with the same name.

Exchange: a switch that receives messages and forwards them to a bound queue based on the routing key.

Binding: Virtual link between exchanges and queues. Binding can contain routing keys

Routing key: a Routing rule that a virtual machine can use to determine how to route a particular message

Queue: Message Queue, which stores messages and delivers them to consumers for consumption.

The RabbitMQ architecture

The RabbitMQ installation

Use RabbitMQ 3.6.5:

  • Environment building:

  • Official website: www.rabbitmq.com/

  • Environment: Linux (centos7 Redhat7)

## 1. First, do some software preparation work on Linux, yum down some basic packages
um install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

/etc/hosts /etc/hostname

# 2. Download the required RabbitMQ package.
get www.rabbitmq.com/releases/erlang/erlang18.3- 1.el7.centos.x86_64.rpm
get http://repo.iotti.biz/CentOS/7/x86_64/socat- 1.7.3.2- 1.1.el7.lux.x86_64.rpm
get www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server- 3.6.5- 1.noarch.rpm

# 3. Install service command
pm -ivh erlang18.3- 1.el7.centos.x86_64.rpm 
pm -ivh socat- 1.7.3.2- 1.1.el7.x86_64.rpm
pm -ivh rabbitmq-server- 3.6.5- 1.noarch.rpm

# 4. Modify user login and connection heartbeat detection
im /usr/lib/rabbitmq/lib/rabbitmq_server- 3.6.5Ebin/rabbit. GaiDian app1: << in loopback_users"guest">> to reserve only guest (for user login) change points2: the heartbeat is10(For heartbeat connection)# 5. Install the admin plug-in

# 5.1 first startup services (behind | contains the stop command, check the state and restart)
etc/init.d/rabbitmq-server start | stop | status | restart

Lsof -i:5672 (5672 is the default Rabbit port)
abbitmq-plugins enable rabbitmq_management

# 5.3 Check whether the management port is enabled:
sof -i:15672Or netstat-tnlp | grep 15672

Enter username and password as guest:
# http://your IP address :15672/

# 7. If all goes well, that's it, our environment has been installed
Copy the code

The RabbitMQ core

Exchange

Direct Exchange

Direct Exchange – Processes routing keys. The need to bind a queue to the switch requires that the message exactly match a particular routing key. This is a complete match. If a queue bound to the switch requires the routing key “dog”, only messages marked “dog” are forwarded, not dog.puppy or dog.guard, but only dog.

Any messages sent to the Direct Exchange will be forwarded to the Queue specified in RouteKey.

1. Generally, you can use the default Exchange provided with rabbitMQ. The Exchange name is an empty string.

2. This mode does not require any Exchange binding

3. A RouteKey is required for message transmission, which can be simply understood as the name of the queue to be sent.

4. If the queue name specified in RouteKey does not exist in vhost, the message is discarded.

case

producers

public class DirectExchangeSender {
    public static void main(String[] args) throws  Exception{
        // 1 Create a ConnectionFactory
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("192.168.123.171");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2. Generate a connection
        Connection connection=connectionFactory.newConnection();
        / / 3. Create a channel
        Channel channel=connection.createChannel();
        / / 4
        String exchangeName = "test_direct_exchange";
        String routingKey = "test_direct_routingKey";
        // Parameters: queue name, persistent, exclusive queue (for this connection only), automatically deleted when not in use, other parameters
        channel.queueDeclare(routingKey,false.false.false.null);
        String msg="Hello World RabbitMQ 4 Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null, msg.getBytes()); }}Copy the code

consumers

public class DirectExchangeReceiver {
    public static void main(String[] args)  throws Exception  {
        // 1 Create a ConnectionFactory
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("192.168.123.171");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection=connectionFactory.newConnection();

        Channel channel = connection.createChannel();
         / / 4
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test_direct_routingKey";

         // Parameters: queue name, automatic ACK, Consumer
        channel.exchangeDeclare(exchangeName, exchangeType, true.false.false.null);
        channel.queueDeclare(queueName, false.false.false.null);
        channel.queueBind(queueName, exchangeName, routingKey);

           //durable Whether to maintain persistent messages
        QueueingConsumer consumer = new QueueingConsumer(channel);
          // Parameters: queue name, automatic ACK, Consumer
        channel.basicConsume(queueName, true, consumer);
        while (true){
            QueueingConsumer.Delivery delivery= consumer.nextDelivery();
            String msg =newString(delivery.getBody()); System.out.println(msg); }}}Copy the code

Topic Exchange

Topic Exchange – Matches routing keys to a pattern. At this point the queue needs to be bound to a pattern. The symbol “#” matches one or more words, and the symbol “**” matches no more than one word. So “dog.#” will match “dog.jacquesh.boy”, but “dog.*” will only match “dog.jacquesh”.

Any message sent to a Topic Exchange will be forwarded to all queues that care about the Topic specified in RouteKey

  1. In simple terms, each queue has a topic of interest, all messages have a “RouteKey”, and Exchange will forward the message to all queues whose topic of interest vaguely matches the RouteKey.

  2. This pattern requires RouteKey, perhaps pre-binding Exchange and Queue.

  3. When binding, provide a topic that the queue cares about, such as “#.log.#” to indicate that the queue cares about all log-involved messages (a message whose RouteKey is “mq.log.error” will be forwarded to the queue).

  4. # indicates zero or multiple keywords, and ** indicates one keyword. For example, log.* matches log.warn, but not log.warn. Timeout. But “log.#” matches both.

  5. Similarly, if Exchange does not find a Queue that matches RouteKey, the message is discarded.

case

producers

public class TopicExchangeSender {
    public static void main(String[] args) throws  Exception {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("192.168.123.171");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection=connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        / / 5 to send

        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        //basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        channel.basicPublish(exchangeName,routingKey1,false.false.null,msg.getBytes());
        channel.basicPublish(exchangeName,routingKey2,false.false.null,msg.getBytes());
        channel.basicPublish(exchangeName,routingKey3,false.false.null,msg.getBytes()); channel.close(); connection.close(); }}Copy the code

consumers

public class TopicExchangeReceiver {
    public static void main(String[] args) throws  Exception {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("192.168.123.171");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        / / 4
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //String routingKey = "user.*";
        String routingKey = "user.#";
        / / declare the Exchange
        channel.exchangeDeclare(exchangeName, exchangeType, true.false.false.null);
        // Declare a queue
        channel.queueDeclare(queueName, false.false.false.null);
        // Queue binding
        channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true){
          QueueingConsumer.Delivery delivery =consumer.nextDelivery();
            System.out.println(newString(delivery.getBody())); }}}Copy the code

Fanout Exchange

Fanout Exchange – Does not handle routing keys. You simply bind the queue to the switch. A message sent to a switch is forwarded to all queues bound to the switch. Much like subnet broadcasting, each host in a subnet gets a copy of the message. The Fanout switch is the fastest to forward messages.

Any messages sent to the Fanout Exchange will be forwarded to all queues bound to that Exchange.

  1. You can think of it as a routing table schema

  2. This pattern does not require a RouteKey

  3. This mode requires that exchanges and queues be bound in advance. An Exchange can be bound to multiple queues, and a Queue can be bound to multiple exchanges.

  4. If the Exchange receiving the message is not bound to any Queue, the message is discarded.

case

producers

public class FanoutExchangeSender {
    public static void main(String[] args) throws Exception {
        / / create the connectionFactory
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("192.168.123.171");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
         String msg="test Fanout";
         String exchange="fanout_exchange";
         channel.basicPublish(exchange,"".null,msg.getBytes()); }}Copy the code

consumers

public class FanoutExchangeRecevier {
    public static void main(String[] args) throws Exception {
        / / create the connectionFactory
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("192.168.123.171");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        String exchange="fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "fanout_queue";

        //String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments
        channel.exchangeDeclare(exchange, exchangeType, true.false.false.null);
        //String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(queueName,true.false.false.null);
        channel.queueBind(queueName, exchange, "");

        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
      while (true){
          QueueingConsumer.Delivery delivery=   queueingConsumer.nextDelivery();
           System.out.println(newString(delivery.getBody())); }}}Copy the code

Producer reliability delivery

Persistent scheme

STEP1, STEP 2: Persist the sent message before sending it

STEP 3: Send messages to the Broker

The Broker sends a message to the consumer.

STEP 5: Change the message status after receiving the message

STEP 6, STEP 7, STEP 8: Periodically resends the message whose status is not updated

Comfirm mechanism

Comfirm mechanism means that after a producer sends a message, if the Broker receives the message, it will send a reply to the producer

The producer receives and responds to determine whether a message is properly sent to the Broker, which is the core guarantee for reliable delivery of messages

// Enable confirmSelect before sending messages
channel.confirmSelect();
// Add confirm listener
channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println("Sent successfully");
            }
            @Override
            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("Send failed"); }});// Send a message
channel.basicPublish("", queueName , null , msg.getBytes());
Copy the code

The Return mechanism

In some cases, if the exchange does not exist or the specified routing key is not available, we need to use a Return Listener to listen for unreachable messages.

There is one key configuration item in the base API: Mandatory: If true, a message that is unreachable is received by the listener and subsequent processing is performed.

If it is false, the broker automatically deletes the message!

// Add a return listener
channel.addReturnListener(new ReturnListener() {
   @Override
public void handleReturn(int i, String s, String s1,
                         String s2, AMQP.BasicProperties basicProperties,
                         byte[] bytes) throws IOException {
    
          System.out.println("Failed to send:"+newString(bytes)); }});//Mandatory Is set to true
 channel.basicPublish(""."bibi" , true ,null, msg.getBytes());
Copy the code

Idempotent solutions for consumers

Business unique ID or fingerprint mechanism, using database primary key for deduplication

SELECT COUNT(1) FROM ORDER WHERE ID= unique service ID or fingerprint codeCopy the code

Consumer’s manual ACK

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
       
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false.false);
Copy the code

TTL and dead letter queue

TTL

TTL is short for Time To Live

RabbitMQ supports an expiration time for messages, which can be specified when a message is sent. This time is calculated from the time the message is queued, and the message will be cleared automatically if the time limit is exceeded

Dead-letter queue

The DLX is also a normal Exchange, no different from regular exchanges. It can be specified on any queue, in effect setting the attributes of a queue.

When there is a dead letter in the queue, RabbitMQ will automatically re-publish the message to the Exchange. To be routed to another queue. The ability to listen to messages in this queue for processing compensates for the immediate parameter previously supported by RabbitMQ3.0.

The conditions of the DLX

The message was basic.reject/ basic.nack and requeue=false The TTL message expired queue reached the maximum length

producers

public class Sender4DLXExchange {

  
  public static void main(String[] args) throws Exception {
    
    / / 1 create ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.71");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    
    / / 2 to create a Connection
    Connection connection = connectionFactory.newConnection();
    / / 3 to create a Channel
    Channel channel = connection.createChannel();  
    / / 4
    String exchangeName = "test_dlx_exchange";
    String routingKey = "group.bfxy";
    / / 5 to send
    
    Map<String, Object> headers = new HashMap<String, Object>();
    
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)
    .contentEncoding("UTF-8")
    // TTL
    .expiration("6000")
    .headers(headers).build();
    
    String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , props , msg.getBytes()); }}Copy the code

consumers

public class Receiver4DLXtExchange {

  public static void main(String[] args) throws Exception {
    
    
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.71");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
    //4 Declare a normal Exchange Queue routing rule
    String queueName = "test_dlx_queue";
    String exchangeName = "test_dlx_exchange";
    String exchangeType = "topic";
    String routingKey = "group.*";
    / / declare the exchange
    channel.exchangeDeclare(exchangeName, exchangeType, true.false.false.null);
    
    
    // Notice that arguments have a special attribute here: x-dead-letter-exchange
    Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-dead-letter-exchange"."dlx.exchange");
    //arguments.put("x-dead-letter-routing-key", "dlx.*");
    //arguments.put("x-message-ttl", 6000);
    channel.queueDeclare(queueName, false.false.false, arguments);
    channel.queueBind(queueName, exchangeName, routingKey);
    
    
    //dlx declare:
    channel.exchangeDeclare("dlx.exchange", exchangeType, true.false.false.null);
    channel.queueDeclare("dlx.queue".false.false.false.null);
    channel.queueBind("dlx.queue"."dlx.exchange"."#");
    
    
        // durable Whether to maintain persistent messages
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // Parameters: queue name, automatic ACK, Consumer
        channel.basicConsume(queueName, true, consumer);  
        // Loop to get the message
        while(true) {// Get the message. If there is no message, this step will block forever
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Received a message:"+ msg); }}}Copy the code

The RabbitMQ SpringBoot integration

producers

  1. pom
<! -- springboot rabbitmq(amqp) -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>  
Copy the code
  1. yaml
Spring. The rabbitmq. Addresses = 192.168.11.71:5672192168:11.72 5672 spring. The rabbitmq. Username = guest Spring. The rabbitmq. Password = guest spring. The rabbitmq. Virtual - host = / spring. The rabbitmq. Connection timeout = 15000 # # use the message to confirm pattern Spring. The rabbitmq. Publisher - confirms = true # # set the return message schema, Attention should cooperate with mandatory use # # spring. The rabbitmq. Publisher - returns = true # # spring. The rabbitmq. Template. Mandatory = true spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULLCopy the code
@Component
public class RabbitSender {

  @Autowired
  private RabbitTemplate rabbitTemplate;
  
  /** * This is the callback listener interface for confirming that the message has been received by the broker */
  final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    / * * *@paramCorrelationData is used as a unique identifier *@paramWhether ack Broker is down successfully *@paramCause Some abnormal information about a failure */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.err.println("Message ACK result: + ack + ", correlationData: "+ correlationData.getId()); }};/** * The method of sending messages *@paramMessage Specifies the message content *@paramProperties Additional additional properties *@throws Exception
   */
  public void send(Object message, Map<String, Object> properties) throws Exception {
    
    MessageHeaders mhs = newMessageHeaders(properties); Message<? > msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback);// Specify a unique business iD
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    
    MessagePostProcessor mpp = new MessagePostProcessor() {
      
      @Override
      public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
          throws AmqpException {
        System.err.println("---> post to do: " + message);
        returnmessage; }}; rabbitTemplate.convertAndSend("exchange-1"."springboot.rabbit", msg, mpp, correlationData); }}Copy the code

consumers

  1. yaml
Spring. The rabbitmq. Addresses = 192.168.11.71:5672192168 11.72:5672192168 11.71:5673 spring. The rabbitmq. Username = guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 ## After the successful message is consumed, the consumer needs to sign for it manually. The default is auto spring. The rabbitmq. Listener. Simple. Acknowledge - mode = manual spring. The rabbitmq. Listener. Simple. Concurrency = 5 Spring. The rabbitmq. Listener. Simple. Max - concurrency = 10. Spring the rabbitmq. Listener. Simple. The prefetch = 1 # # # # it is best not to write die configuration information in the code, ${} = ${} = ${} = ${} ${spring.rabbitmq.listener.order.exchange.name} spring.rabbitmq.listener.order.exchange.name=order-exchange spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.key=order.* spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULLCopy the code
@Component
public class RabbitReceive {
  
  /** * /**@RabbitListener @QueueBinding @Queue @Exchange
   * @param message
   * @param channel
   * @throws Exception
   */
  @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable = "true"), exchange = @Exchange(name = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) )
  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    // 1. Consume the message on the business side after receiving it
    System.err.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
    System.err.println("Consumer News: + message.getPayload());

// 2. Obtain deliveryTag after successful processing and perform manual ACK operation, because manual signature is configured in our configuration file
// spring.rabbitmq.listener.simple.acknowledge-mode=manual
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  channel.basicAck(deliveryTag, false); }}Copy the code