The message queue

  • The decoupling
  • asynchronous
  • Peak clipping

Docker to deploy the RabbitMQ

Download the RabbitMQ imageDocker pull the rabbitmq: 3.7.7 - managementCreate a rabbitMQ container with port 15672 and default username and password guestDocker run -d --name myRabbitMQ -p 5672:5672 -p 15672:15672 Rabbitmq :3.7.7- ManagementCopy the code

The RabbitMQ components

  • producers

  • consumers

  • switches

  • Virtual host

  • The queue

Building simple Patterns

// Turn on or off the connection utility class
public class ConnectionUtil {
    public static Connection openConnection(a) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("zhangsan");
        factory.setPassword("333");
        factory.setVirtualHost("/");
        Connection connection = factory.newConnection();
        return connection;
    }
    public static void closeConnection(Connection connection, Channel channel) { connection.close(); channel.close(); }}// Create a queue and switch
{
    Connection connection = ConnectionUtil.openConnection();
    Channel channel = connection.createChannel();
    String queueName = "myQueue";
    String exchangeType = "direct";
    // Create a queue, name the queue, whether to persist, whether to be unique to the channel, whether to automatically delete the queue, with accessary parameters
    channel.queueDeclare(queueName, false.false.false.null);
    // Make the switch persistent
    channel.exchangeDeclare(exchangeName,  exchangeType, true);
    // Bind queues and switches
    channel.queueBind(queueName, exchangeName, routingKey);
    // Close the connection
    ConnectionUtil.closeConnection(connection, channel);
}

// Producer build
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.openConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "myExchange";
        String routingKey = "myRoutingKey";
        String msg = "Hello World";
        / / production
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        // Close the connectionConnectionUtil.closeConnection(connection, channel); }}// Consumer build
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.openConnection();
        Channel channel = connection.createChannel();
        String queueName = "myQueue";
        / / consumption
        channel.basicConsume(queueName, true.new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println("Received a message" + new String(delivery.getBody(), "UTF-8")); }},new CancelCallback() {
            public void handle(String s) throws IOException {
                System.out.println("Failed to receive message"); }}); System.out.println("Receiving...");
        System.in.read();
        // Close the connectionConnectionUtil.closeConnection(connection, channel); }}Copy the code

Consumption patterns

The average way

// Set autoAck to true
channel.basicConsumer("queue".true, deliverCallback);
Copy the code

Fair way

// Set only one message to be consumed at a time
channel.basicQos(1);

// Set autoAck to false
channel.basicConsumer("queue".false, deliverCallback);

// Message consumption is confirmed in the callback function
channel.basicAck(delivery.getEnvelop().getDeliveryTag(), false);
Copy the code

Production mode

Fanout

// The switch sends messages to all queues bound to it
channel.basicPublish(exchangeName, "".null, msg.getBytes());
Copy the code

Direct

// The switch sends messages to the queue bound to the RoutingKey
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
Copy the code

Topic

// Switch binding sets a fuzzy routingkey, # for n words and * for 1 word
channel.queueBind(queueName, exchangeName, routingKey);

// The switch sends messages to a vaguely matched queue
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
Copy the code

Integrated SpringBoot

# configuration application. Yaml
server:
    port:
        8080
spring:
    rabbitmq:
        username: zhangsan
        password: 333
        virtual-host: /
        host: 127.0. 01.
        port: 5672
Copy the code
@Configuration
public class RabbitMqConfiguration {
    // Create a switch
    @Bean
    public FanoutExchange fanoutExchange(a) {
        return new FanoutExchange("myExchange".true.false);
    }
    // Create a queue
    @Bean
    public Queue myQueue(a) {
        return new Queue("myQueue".true);
    }
    // Complete the binding
    public Binding myBind(a) {
        returnBindingBuilder.bind(myQueue()).to(fanoutExchange()); }}Copy the code
// The producer sends the message
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void product(a) {
        String exchangeName = "myExchange";
        String routingKey = "";
        String message = "message"; rabbitTemplate.convertAndSend(exchangeName, routingKey, message); }}Copy the code
// The consumer receives the message
@Component
@RabbitListener(queues = {"myQueue"})
public class Consumer {
    @RabbitHandler
    public void consume(String message) { System.out.println(message); }}Copy the code

Expiration time TTL

Setting the QUEUE TTL

@Configuration
public class TTLRabbitMQConfiguration {
    @Bean
    public DirectExchange ttlDirectExchange(a) {
        return new DirectExchange("ttlDirectExchange".true.false);
    }
    @Bean
    public Queue ttlDirectQueue(a) {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl".5000);
        return new Queue("ttlDirectionQueue".true.false.false, args);
    }
    @Bean
    public Binding ttlBinding(a) {
        return BindingBuilder.bin(ttlDirectQueue()).to(ttlDirectExchange()).with("routingKey"); }}Copy the code

Setting the message TTL

public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void product(a) {
        String exchangeName = "ttlDirectExchange";
        String routingKey = "ttl";
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            public Message postProcessMessage(Message message) {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                returnmessage; }}; rabbitTemplate.convertAndSend(exchangeName, routingKey, message, messagePostProcessor); }}Copy the code

Dead-letter queue

The message expired, the message was rejected, and the queue reached its maximum length

@Configuration
public class RabbitConfiguration {
    @Bean
    public DirectExchanges deadDirect(a) {
        return new DirectExchage("deadExchange".true.false);
    }
    @Bean
    public Queue deadQueue(a) {
        return new Queue("deadQueue".true);
    }
    @Bean
    public Binding deadBinding(a) {
        return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
    }
    @Bean
    public Queue ttlDirectQueue(a) {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl".5000);
        args.put("x-dead-letter-exchange"."dead_direct_exchange");
        args.put("x-dead-letter-routing-key"."dead");
        return new Queue("ttlDirectionQueue".true.false.false, args).with("routingKey")}}Copy the code

The cluster

The main for the cluster

Configure the host for each host
echo "ip hostName" >> /etc/hosts

# synchronizing each node/var/lib/rabbitmq /. Erlang. Cookies

Start each node
rabbitmq-server -detached

Add the node to the cluster
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@mainHostName
rabbitmqctl start_app
Copy the code

High availability cluster

# add policy on node, all queue become mirror queue
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
Copy the code