The message queue
- The decoupling
- asynchronous
- Peak clipping
Docker to deploy the RabbitMQ
Download the RabbitMQ imageDocker pull the rabbitmq: 3.7.7 - managementCreate a rabbitMQ container with port 15672 and default username and password guestDocker run -d --name myRabbitMQ -p 5672:5672 -p 15672:15672 Rabbitmq :3.7.7- ManagementCopy the code
The RabbitMQ components
-
producers
-
consumers
-
switches
-
Virtual host
-
The queue
Building simple Patterns
// Turn on or off the connection utility class
public class ConnectionUtil {
public static Connection openConnection(a) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("zhangsan");
factory.setPassword("333");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
return connection;
}
public static void closeConnection(Connection connection, Channel channel) { connection.close(); channel.close(); }}// Create a queue and switch
{
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
String queueName = "myQueue";
String exchangeType = "direct";
// Create a queue, name the queue, whether to persist, whether to be unique to the channel, whether to automatically delete the queue, with accessary parameters
channel.queueDeclare(queueName, false.false.false.null);
// Make the switch persistent
channel.exchangeDeclare(exchangeName, exchangeType, true);
// Bind queues and switches
channel.queueBind(queueName, exchangeName, routingKey);
// Close the connection
ConnectionUtil.closeConnection(connection, channel);
}
// Producer build
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
String exchangeName = "myExchange";
String routingKey = "myRoutingKey";
String msg = "Hello World";
/ / production
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// Close the connectionConnectionUtil.closeConnection(connection, channel); }}// Consumer build
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
String queueName = "myQueue";
/ / consumption
channel.basicConsume(queueName, true.new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("Received a message" + new String(delivery.getBody(), "UTF-8")); }},new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("Failed to receive message"); }}); System.out.println("Receiving...");
System.in.read();
// Close the connectionConnectionUtil.closeConnection(connection, channel); }}Copy the code
Consumption patterns
The average way
// Set autoAck to true
channel.basicConsumer("queue".true, deliverCallback);
Copy the code
Fair way
// Set only one message to be consumed at a time
channel.basicQos(1);
// Set autoAck to false
channel.basicConsumer("queue".false, deliverCallback);
// Message consumption is confirmed in the callback function
channel.basicAck(delivery.getEnvelop().getDeliveryTag(), false);
Copy the code
Production mode
Fanout
// The switch sends messages to all queues bound to it
channel.basicPublish(exchangeName, "".null, msg.getBytes());
Copy the code
Direct
// The switch sends messages to the queue bound to the RoutingKey
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
Copy the code
Topic
// Switch binding sets a fuzzy routingkey, # for n words and * for 1 word
channel.queueBind(queueName, exchangeName, routingKey);
// The switch sends messages to a vaguely matched queue
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
Copy the code
Integrated SpringBoot
# configuration application. Yaml
server:
port:
8080
spring:
rabbitmq:
username: zhangsan
password: 333
virtual-host: /
host: 127.0. 01.
port: 5672
Copy the code
@Configuration
public class RabbitMqConfiguration {
// Create a switch
@Bean
public FanoutExchange fanoutExchange(a) {
return new FanoutExchange("myExchange".true.false);
}
// Create a queue
@Bean
public Queue myQueue(a) {
return new Queue("myQueue".true);
}
// Complete the binding
public Binding myBind(a) {
returnBindingBuilder.bind(myQueue()).to(fanoutExchange()); }}Copy the code
// The producer sends the message
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void product(a) {
String exchangeName = "myExchange";
String routingKey = "";
String message = "message"; rabbitTemplate.convertAndSend(exchangeName, routingKey, message); }}Copy the code
// The consumer receives the message
@Component
@RabbitListener(queues = {"myQueue"})
public class Consumer {
@RabbitHandler
public void consume(String message) { System.out.println(message); }}Copy the code
Expiration time TTL
Setting the QUEUE TTL
@Configuration
public class TTLRabbitMQConfiguration {
@Bean
public DirectExchange ttlDirectExchange(a) {
return new DirectExchange("ttlDirectExchange".true.false);
}
@Bean
public Queue ttlDirectQueue(a) {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl".5000);
return new Queue("ttlDirectionQueue".true.false.false, args);
}
@Bean
public Binding ttlBinding(a) {
return BindingBuilder.bin(ttlDirectQueue()).to(ttlDirectExchange()).with("routingKey"); }}Copy the code
Setting the message TTL
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void product(a) {
String exchangeName = "ttlDirectExchange";
String routingKey = "ttl";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
public Message postProcessMessage(Message message) {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
returnmessage; }}; rabbitTemplate.convertAndSend(exchangeName, routingKey, message, messagePostProcessor); }}Copy the code
Dead-letter queue
The message expired, the message was rejected, and the queue reached its maximum length
@Configuration
public class RabbitConfiguration {
@Bean
public DirectExchanges deadDirect(a) {
return new DirectExchage("deadExchange".true.false);
}
@Bean
public Queue deadQueue(a) {
return new Queue("deadQueue".true);
}
@Bean
public Binding deadBinding(a) {
return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
}
@Bean
public Queue ttlDirectQueue(a) {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl".5000);
args.put("x-dead-letter-exchange"."dead_direct_exchange");
args.put("x-dead-letter-routing-key"."dead");
return new Queue("ttlDirectionQueue".true.false.false, args).with("routingKey")}}Copy the code
The cluster
The main for the cluster
Configure the host for each host
echo "ip hostName" >> /etc/hosts
# synchronizing each node/var/lib/rabbitmq /. Erlang. Cookies
Start each node
rabbitmq-server -detached
Add the node to the cluster
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@mainHostName
rabbitmqctl start_app
Copy the code
High availability cluster
# add policy on node, all queue become mirror queue
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
Copy the code