Expiration time TTL
Expiration time TTL indicates the expected time that the message can be received by consumers within this time.
After that, the message will be deleted automatically.
RabbitMQ can set TTL for messages and queues. There are currently two ways to set this up.
- The first method is through the queue property setting, where all messages in the queue have the same expiration time.
- The second approach is to set the message individually, and the TTL for each message can be different.
If both methods are used together, the expiration time of the message is the value with the lower TTL between the two methods.
Once the TTL of a message exceeds the set value, the message is sent to the dead message queue and consumers cannot receive the message.
Setting the QUEUE TTL
// Set related configurations. You can also set them on the Web UI
package com.zwt.springbootfanout.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class TTLRabbitMqConfiguration {
@Bean
public DirectExchange ttlDirectExchange(a) {
return new DirectExchange("ttl_direct_exchange".true.true);
}
@Bean
public Queue tttDireclQueue(a) {
HashMap<String, Integer> map = new HashMap<>();
map.put("x-message-ttl".2000);
return new Queue("ttl.direct.queue".true);
}
@Bean
public Binding ttlDirectBinding(a) {
return BindingBuilder.bind(tttDireclQueue()).to(ttlDirectExchange()).with("ttl"); }}Copy the code
The expiration field indicates the TTL value in microseconds.
It has the same constraints as x-message-TTL.
Because the expiration field must be a string, the broker will only accept numbers expressed as strings.
When both queue and message TTL values are specified, the smaller of the two takes effect.
Configure the message acknowledgement mechanism
The value NONE disables publication confirmation mode and is the default
A CORRELATED value is a callback method that is triggered when a message is successfully posted to the exchange
The SIMPLE value has been tested for two effects
The effect of CORRELATED values is the same as that of CORRELATED values. The rabbitTemplate method is used to call waitForConfirms or waitForConfirmsOrDie to wait for broker nodes to send back the result after successful publication of messages. To determine the next logical step, note that the waitForConfirmsOrDie method closes the channel if it returns false and then cannot send messages to the broker;Copy the code
Dead-letter queue
DLX, which stands for dead-letter-Exchange, can be called a dead-letter Exchange or a dead-letter mailbox.
When a message becomes a dead message in one queue, it can be re-sent to another switch, the DLX. The queue bound to the DLX is called a dead letter queue.
Messages become dead-letter for the following reasons:
- Message rejected
- Message expiration
- The queue length reaches the maximum. Procedure
The DLX is also a normal switch, no different from a normal switch. It can be specified on any queue, in effect setting the attributes of a queue.
When there is a dead letter in the queue, Rabbitmq will automatically re-publish the message to the set DLX and route it to another queue, the dead letter queue.
To use a dead-letter queue, you simply set the queue parameter x-dead-letter-exchange to specify the switch when defining the queue.
Monitor memory disks
When the memory usage exceeds the configured threshold or the remaining disk space exceeds the configured threshold,
RabbitMQ temporarily blocks the connection to the client and stops receiving messages from the client to prevent the server from crashing.
The mentality detection mechanism of client and server will also fail.
When blocking or blocked occurs, the threshold is reached and the load is high.
Memory control for RabbitMQ
Reference documentation: www.rabbitmq.com/configure.h…
When a warning occurs, it can be modified and adjusted through configuration.
Way to command
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
Copy the code
Fraction /value indicates the memory threshold.
The default is 0.4/2GB, which means a warning will be generated and all producer connections will be blocked when RabbitMQ memory exceeds 40%.
Using this command to change the threshold takes effect after the Broker restarts. Thresholds set by modifying the configuration file do not disappear after the Broker restarts. However, the thresholds set by modifying the configuration file take effect only after the Broker restarts.
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
Copy the code
Configuration file rabbitmq.conf
The current configuration file: / etc/rabbitmq/rabbitmq. Conf
# default #vm_memory_high_watermark.relative =0.4# Use relative relative value to set fraction. The recommended value is04~0.7It is not recommended to exceed0.7.
vm_memory_high_watermark.relative = 0.6Absolute = vm_memory_high_watermark. Absolute =2GB
Copy the code
Memory paging for RabbitMQ
Before a Broker node and memory block the producer, it tries to page messages from the queue to disk to free up memory.
Both persistent and nonpersistent messages are written to disk,
The persistent message itself has a copy in disk, so the persistent message is flushed out of memory during migration.
By default, page feeding occurs when the memory threshold is 50%. That is, when the memory exceeds 0.4*0.5=0.2, the page feed action is performed when the memory threshold is 0.4 by default.
Let’s say I have 1000MB of memory, and when I hit 400MB,
The limit has been reached, but since the page feed memory is configured at 0.5, 200MB of memory will be transferred to disk before the 400MB limit is reached. Thus achieve robust operation.
This can be adjusted by setting vm_memory_high_watermark_paging_ratio.
Disk alerts for RabbitMQ
RabbitMQ also blocks producers when the free disk space falls below a certain threshold,
This prevents the server from crashing due to running out of disk space as non-persistent messages keep paging.
By default, the alarm is generated when the disk alarm is 50MB.
Indicates that the current 50MB of disk space blocks the producer and stops the paging of memory messages to disk.
Run the following command to modify the configuration:
Rabbitmqctl set_disk_free_limit <disk_limit> rabbitmqctl set_disk_free_limit memory_limit <fraction> disk_limit: Fixed unit KB MB GB fraction: indicates the relative threshold. The recommended value ranges are as follows:1.0~2.0In between. (relative to memory)Copy the code
The configuration file is as follows:
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb
Copy the code
The cluster
The RabbitMQ cluster
RabbitMQ, the message queue middleware product itself, is written in Erlang, which is naturally distributed (by synchronizing magic cookies across nodes in an Erlang cluster).
Therefore, RabbitMQ naturally supports Clustering.
This eliminates the need for RabbitMQ to implement HA schemes and store cluster metadata via ZooKeeper, as ActiveMQ and Kafka do, respectively.
Clustering is a way to ensure reliability while scaling horizontally to increase message throughput capacity.
Mainly refer to the official documentation: www.rabbitmq.com/clustering….
The cluster structures,
ps aux| grep rabbitmq systemctl status the rabbitmq - server scenario: suppose you have two rabbitmq nodes, rabbit, respectively1, rabbit-2Rabbit -1As the primary node, rabbit-2As a slave node. Start command: RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1Rabbitmq-server-detached End command: rabbitmqctl -n rabbit-1Stop Step 1: Start the first node rabbit-1
> sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start&... Omit... ########## Logs: /var/log/rabbitmq/rabbit-1.log
###### ## /var/log/rabbitmq/rabbit-1-sasl.log
##########
Starting broker...
completed with 7Plugins. Start the second node, rabbit-2RABBITMQ_SERVER_START_ARGS= "-rabbitmq_management listener [{port,15673}] "sudo RABBITMQ_NODE_PORT =5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start&... Omit... ########## Logs: /var/log/rabbitmq/rabbit-2.log
###### ## /var/log/rabbitmq/rabbit-2-sasl.log
##########
Starting broker...
completed with 7Plugins. Verify that "PS" is enabledauxRabbit - | grep rabbitmq"1Sudo rabbitmqctl -n rabbit-1Stop_app > sudo rabbitmqctl -n rabbit-1Sudo rabbitmqctl -n rabbit-1Start_app rabbit2 run the following command to stop rabbitmqctl: sudo rabbitmqctl -n rabbit-2Stop_app > sudo rabbitmqctl -n rabbit-2Reset # add rabbit2 to rabbit1 Server node > sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1Sudo rabbitmqctl -n rabbit-2Start_app Verify cluster status > sudo rabbitmqctl cluster_status -n rabbit-1// The cluster has two nodes: rabbit-1@ Server node, rabbit2@Server-node
[{nodes,[{disc,['rabbit-1@Server-node','rabbit-2@Server-node']}]},
{running_nodes,['rabbit-2@Server-node','rabbit-1@Server-node']},
{cluster_name,<<"rabbit-1@Server-node.localdomain">>},
{partitions,[]},
{alarms,[{'rabbit-2@Server-node',[]},{'rabbit-1@server-node ',[]}]}] Note at access time: Web node management needs to be given15672 node-1 和15673The node -2Set the user name and password. rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
Copy the code
Tips: In the multi-machine deployment mode, you need to read the cookies of one node and copy them to other nodes (nodes use cookies to determine whether they can communicate with each other).
Cookies are stored in/var/lib/rabbitmq /. Erlang. Cookies.
For example, the host names are rabbit-1 and rabbit-2
1. Start each node one by one
2. Configure the hosts file of each node (vim /etc/hosts).
Ip1: rabbit – 1
Ip2: rabbit – 2
The other steps are the same as the single-node deployment mode
Distributed transaction
Distributed transaction means that the operation of the transaction is located on different nodes, so the AICD feature of the transaction must be guaranteed.
For example, in an order scenario, a distributed transaction is involved if the inventory and order are not on the same node.
Distributed transaction approach
In distributed systems, there are no more than a few solutions to implement distributed transactions.
Two-phase commit (2PC) requires support from database vendors, Java components such as Atomikos
A two-phase Commit (2PC) is a process by which a Coordinator is introduced to coordinate the actions of participants and ultimately determine whether these participants will actually perform transactions.
Preparation stage
The coordinator asks the participant if the transaction executed successfully, and the participant sends back the result of the transaction execution.
The commit phase
If the transaction executes successfully on each participant, the transaction coordinator sends a notification for the participant to commit the transaction;
Otherwise, the coordinator sends a notification to the participant to roll back the transaction.
It is important to note that in the preparation phase, the participant executes the transaction but has not committed it yet. Commit or rollback occurs only after notification from the coordinator is received during commit phase.
Existing problems
- Synchronous blocking All transaction participants are synchronous blocked while waiting for responses from other participants.
- The single point of problem coordinator plays a very large role in 2PC, and failure can have a big impact. In particular, when phase two fails, all participants are kept in a waiting state, unable to complete other operations.
- Data inconsistency In phase 2, if the coordinator sends only part of the Commit message, an exception occurs on the network, and only part of the participants receive the Commit message, that is, only part of the participants Commit the transaction, making the system data inconsistent.
- Too conservative, the failure of any node will lead to the failure of the whole transaction, without a perfect fault tolerance mechanism.
Compensation Affairs (TCC) Yan Xuan, Ali, Ant Financial
TCC is the compensation mechanism adopted. The core idea is that for each operation, a corresponding acknowledgement and compensation (undo) operation should be registered.
It is divided into three stages:
-
In the Try phase, service systems are detected and resources are reserved.
-
The Confirm phase is used to Confirm the submission of the service system. When the Try phase is successfully executed and the Confirm phase starts,
The default Confirm phase does not go wrong. If the Try succeeds, Confirm succeeds.
-
In the Cancel phase, services that need to be rolled back due to errors are cancelled and reserved resources are released.
For example, if Bob wants to transfer money to Smith,
The idea is that we have a local method that we call in turn
1: First, in the Try phase, the remote interface is called to freeze Smith and Bob’s money.
2: Perform the remote transfer operation in the Confirm phase. The transfer is unfrozen successfully.
3: If the second step succeeds, the transfer succeeds. If the second step fails, the corresponding unfreezing method (Cancel) of the remote freezing interface is called.
Advantages: Compared with 2PC, the implementation and process is relatively simple, but the data consistency is also worse than 2PC
Disadvantages: Disadvantages are obvious, failure is possible in step 2 or 3.
TCC is a compensation method at the application layer, so programmers need to write more compensation code during implementation. In some scenarios, some business processes may be difficult to define and handle with TCC.
Local message table (asynchronous ensure) for example: Alipay, wechat Pay actively query the payment status, statement form
The local message table is in the same database as the business data table, so local transactions can be used to ensure that the transaction characteristics are met during operations on the two tables, and message queues are used to ensure final consistency.
-
A message is sent to the local message table after a party to a distributed transaction has written business data. The local transaction guarantees that the message is written to the local message table.
-
After that, the message in the local message table is forwarded to a message queue such as Kafka. If the forwarding succeeds, the message is deleted from the local message table. Otherwise, the message is forwarded again.
-
The other party in the distributed transaction reads a message from the message queue and performs the operation in the message.
Advantages: A very classic implementation that avoids distributed transactions and achieves ultimate consistency.
Disadvantages: Message tables are coupled to the business system, and without a packaged solution, there is a lot of chores to deal with.
MQ transaction message asynchronous scenario, strong universality, high scalability
There are some third-party MQS that support transactional messages, such as RocketMQ,
They also support transactional messages in a manner similar to the two-phase commit used,
However, some of the most popular MQ systems on the market do not support transactional messaging, such as Kafka.
Taking Ali’s RabbitMQ middleware as an example, its idea is roughly as follows:
- The first stage Prepared message will get the address of the message.
- The second phase performs the local transaction
- Phase 3 accesses the message using the address obtained in phase 1 and modifies the state.
That is, two requests, one send message and one acknowledgement message are submitted to the message queue within the business method. RabbitMQ will periodically scan the message cluster for transaction messages that fail to be sent. If Prepared is found, it will confirm the message to the sender, so the producer needs to implement a check interface. RabbitMQ will decide whether to roll back or continue sending confirmation messages based on the sender's policy. This ensures that message delivery and the local transaction both succeed or fail at the same time.Copy the code
Advantages: Ultimate consistency is achieved without reliance on local database transactions.
Cons: Difficult to implement, not supported by mainstream MQ, and part of the Code for RocketMQ transaction messages is not open source.
Through this paper, we summarize and compare the advantages and disadvantages of several distributed decomposition schemes. Distributed transaction itself is a technical problem.
There is no one perfect solution for all scenarios; it depends on the business scenario.
Ali RocketMQ to achieve distributed transactions, now there are in addition to a lot of distributed transaction coordinator, such as LCN, etc., we can try more.
The specific implementation
A complete architecture diagram of distributed transactions
The structure of Meituan Takeout
Distributed transactions between systems
A transaction rollback problem during intersystem invocation
import com.zwt.rabbitmq.dao.OrderDataBaseService;
import com.zwt.rabbitmq.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
@Service
public class OrderService {
@Autowired
private OrderDataBaseService orderDataBaseService;
// Create an order
@Transactional(rollbackFor = Exception.class) // The entire method of order creation adds a transaction
public void createOrder(Order orderInfo) throws Exception {
// 1: Order information -- insert the order system, order database transaction
orderDataBaseService.saveOrder(orderInfo);
// 2: Sends the order information to the waybill system through Http interface
String result = dispatchHttpApi(orderInfo.getOrderId());
if(!"success".equals(result)) {
throw new Exception("Order creation failed because waybill interface call failed!"); }}/** * Simulates the HTTP request interface to send, the waybill system to pass the order number to SpringCloud *@return* /
private String dispatchHttpApi(String orderId) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// Link timeout > 3 seconds
factory.setConnectTimeout(3000);
// Processing timeout > 2 seconds
factory.setReadTimeout(2000);
// Send an HTTP request
String url = "http://localhost:9000/dispatch/order? orderId="+orderId;
RestTemplate restTemplate = new RestTemplate(factory);/ / exception
String result = restTemplate.getForObject(url, String.class);
returnresult; }}Copy the code
Reliable production of distributed transaction messages based on MQ
If there is an exception or failure of the MQ server at this time, the message is unable to get the receipt information.
What to do?
Reliable production problems for distributed transaction messages based on MQ – scheduled retransmission
If anything goes wrong, we’ll resend the message.
Reliable consumption of distributed transaction messages based on MQ
Message retransmission of distributed transaction messages based on MQ
Setting the number of retries must be controlled or try/catch
Dead letter queue message transfer + manual processing of distributed transaction messages based on MQ
If the dead-letter queue reports an error, manual processing is performed
conclusion
Advantages of distributed transaction solution based on MQ:
1, strong versatility 2, easy to expand 3, low coupling degree, the scheme is mature
Disadvantages of distributed transaction solutions based on MQ:
1. Based on message-oriented middleware, it is only suitable for asynchronous scenarios. 2
advice
1. Avoid distributed transactions as much as possible. 2
Springboot integration rabbitMQ cluster configuration details
1 the introduction of the starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
2: Details are as follows
rabbitmq:
addresses: 127.0. 01.: 6605127.00 0.1:6606127.00 0.1:6705 # address specifies the address of the server to which the client is connected. Multiple addresses are separated by commas.
# port:
Separate cluster addresses with commas
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / Connect to the vhost of rabbitMQ
requested-heartbeat: # set heartbeat timeout in seconds. 0 is not specified. The default 60 s
publisher-confirms: # Enable publish confirmation
publisher-reurns: # Enable publish returns
connection-timeout: # connection timeout = 0
cache:
channel.size: The number of channels held in the cache
channel.checkout-timeout: Get the timeout of a channel from the cache in milliseconds when the number of caches is set. If 0, a new channel is always created
connection.size: The number of cached connections is valid only in CONNECTION mode
connection.mode: CONNECTION factory cache modes: CHANNEL and CONNECTION
listener:
simple.auto-startup: Whether to automatically start the container when it starts
simple.acknowledge-mode: # indicates the message acknowledgement mode, which has three configurations: None, manual, and auto. The default auto
simple.concurrency: # Minimum number of consumers
simple.max-concurrency: # Maximum number of consumers
simple.prefetch: # specifies how many messages can be processed by a request, and must be greater than or equal to the number of transactions, if any.
simple.transaction-size: # specify the number of messages for a transaction, preferably less than or equal to the number of prefetch messages.
simple.default-requeue-rejected: # decide whether rejected messages should be rejoined; The default is true (related to the parameter acknowledge-mode)
simple.idle-event-interval: # How long to publish the idle container in milliseconds
simple.retry.enabled: Check whether retries are available
simple.retry.max-attempts: # Maximum retry times
simple.retry.initial-interval: # The interval between the first and second attempts to publish or deliver a message
simple.retry.multiplier: The multiplier applied to the previous retry interval
simple.retry.max-interval: # Maximum retry interval
simple.retry.stateless: Retries are stateful or stateless
template:
mandatory: # enable mandatory information; The default false
receive-timeout: The timeout for the # receive() operation
reply-timeout: # sendAndReceive() timeout
retry.enabled: Whether send retry is available
retry.max-attempts: # Maximum retry times
retry.initial-interval: # The interval between the first and second attempts to publish or deliver a message
retry.multiplier: The multiplier applied to the previous retry interval
retry.max-interval: # Maximum retry interval
Copy the code
For the sender, you need to do the following:
1 configuration CachingConnectionFactory
2 configuration Exchange/Queue/Binding
3 Configure RabbitAdmin to create an Exchange/Queue/Binding
4 Configure RabbitTemplate to send messages. RabbitTemplate gets a Connection from CachingConnectionFactory and wants to specify Exchange to send messages
For consumers, the following configuration is required:
1 configuration CachingConnectionFactory
2 configuration Exchange/Queue/Binding
3 Configure RabbitAdmin to create an Exchange/Queue/Binding
4 configuration RabbitListenerContainerFactory
5 Configure @rabbitListener / @Rabbithandler to receive messages
3 Main objects for Spring AMQP
Note: If you do not know AMQP, please visit the official website.
4 use:
By configuring class loading:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
public static final String RECEIVEDLXEXCHANGE="spring-ex";
public static final String RECEIVEDLXQUEUE="spring-qu1";
public static final String RECEIVEDLXROUTINGKEY="aa";
public static final String DIRECTEXCHANGE="spring-ex";
public static final String MDMQUEUE="mdmQueue";
public static final String TOPICEXCHANGE="spring-top";
@Value("${spring.rabbitmq.addresses}")
private String hosts;
@Value("${spring.rabbitmq.username}")
private String userName;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
/* @Value("${rabbit.channelCacheSize}") private int channelCacheSize; * /
// @Value("${rabbit.port}")
// private int port;
/* @Autowired private ConfirmCallBackListener confirmCallBackListener; @Autowired private ReturnCallBackListener returnCallBackListener; * /
@Bean
public ConnectionFactory connectionFactory(a){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(hosts);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
// cachingConnectionFactory.setChannelCacheSize(channelCacheSize);
//cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setVirtualHost(virtualHost);
// Set the connection factory cache mode:
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// Cache connections
cachingConnectionFactory.setConnectionCacheSize(3);
// Set connection limits
cachingConnectionFactory.setConnectionLimit(6);
logger.info("Connection factory setup complete, connection address {}"+hosts);
logger.info("Connection factory setup complete, connect user {}"+userName);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(a){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
rabbitAdmin.declareBinding(bindingMdmQueue());
// Declare the topic switch
rabbitAdmin.declareExchange(directExchange());
logger.info("Administrator Settings completed");
return rabbitAdmin;
}
@Bean
public RabbitListenerContainerFactory listenerContainerFactory(a) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// Minimum number of consumers
factory.setConcurrentConsumers(10);
// Maximum number of consumers
factory.setMaxConcurrentConsumers(10);
// The maximum number of messages to be processed in a request
factory.setPrefetchCount(10);
//
factory.setChannelTransacted(true);
// No queuing by default
factory.setDefaultRequeueRejected(true);
// Manually confirm that the message was received
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
logger.info("Listener setup completed.");
return factory;
}
@Bean
public DirectExchange directExchange(a){
return new DirectExchange(DIRECTEXCHANGE,true.false);
}
@Bean
public Queue mdmQueue(a){
Map arguments = new HashMap<>();
// Bind the queue to the private switch
arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE);
arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY);
logger.info("Queue switch binding completed");
return new Queue(RECEIVEDLXQUEUE,true.false.false,arguments);
}
@Bean
Binding bindingMdmQueue(a) {
return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");
}
@Bean
public RabbitTemplate rabbitTemplate(a){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
// Publish confirmation
// rabbitTemplate.setConfirmCallback(confirmCallBackListener);
// Enable publish return
// rabbitTemplate.setReturnCallback(returnCallBackListener);
logger.info("Connection template setup completed");
return rabbitTemplate;
}
/* @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPICEXCHANGE,true,false); } * /
/ * * // * * *@return DirectExchange
*//* @Bean public DirectExchange dlxExchange() { return new DirectExchange(RECEIVEDLXEXCHANGE,true,false); } * //* * * @return Queue *//* @Bean public Queue dlxQueue() { return new Queue(RECEIVEDLXQUEUE,true); } * //* * @return Binding *//* @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY); } * /
}
Copy the code
Load in two ways
Procedure 1 Use the configuration file
2 Configure classes
Note: The above is through the configuration file and configuration class to load, the common configuration as shown above.
In actual use, the producer and the consumer should be configured separately, and the relevant configuration will also have small changes, and the general configuration remains unchanged. More information can be found on the official website.
The cluster monitoring
RabbitMQ is almost always clustered in the vast Internet industry, so monitoring the cluster becomes an essential part of the enterprise ecosystem.
Let’s take a look at the four main types of monitoring.
Management Interface Monitoring
Rabbitmq-plugins enable rabbitmq_management
Then go to http://ip:15672
Tracing Log Monitoring
Here are the commands for trace and how to use it (you need to enable the rabbitMQ plugin and then turn it on) :
Command set | describe |
---|---|
rabbitmq-plugins list | View the list of plug-ins |
rabbitmq-plugins enable rabbitmq_tracing | Enable the trace plugin for Rabbitmq |
rabbitmqctl trace_on | Turn on trace |
rabbitmqctl trace_on -p itcast | Turn on trace (ITcast is the vhost that needs to log trace) |
rabbitmqctl trace_off | Turn off trace |
rabbitmq-plugins disable rabbitmq_tracing | Rabbitmq turns off the Trace plugin |
rabbitmqctl set_user_tags heima administrator | Only the administrator can view the log page |
After installing the plug-in and enabling trace_on, you will find multiple exchanges: amq.rabbitmq.trace of type: topic.
Log to track
rabbitTemplate.convertAndSend("spring_queue"."Only send messages for queue spring_queue --01.");
Copy the code
Customize your own monitoring system
RabbitMQ provides a rich restful API,
We can get the corresponding cluster data through these interfaces, and then we can customize our monitoring system.
More information and descriptions of the API can be found at http://ip:15672/api/
Next we use the RabbitMQ Http API to get cluster monitoring data
HttpClient and Jackson related jars
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
Copy the code
Create the MonitorRabbitMQ class to implement the concrete code
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/** * RabbitMQ monitoring */
public class MonitorRabbitMQ {
//RabbitMQ HTTP API - get the status of each instance of the cluster and replace it with the IP of the instance you deployed
private static String RABBIT_NODES_STATUS_REST_URL = "http://192.168.13.111:15672/api/nodes";
//RabbitMQ HTTP API - get cluster user information and replace the IP with that of the instance you deploy
private static String RABBIT_USERS_REST_URL = "http://192.168.13.111:15672/api/users";
// User name for rabbitmq
private static String RABBIT_USER_NAME = "guest";
/ / the rabbitmq password
private static String RABBIT_USER_PWD = "guest";
public static void main(String[] args) {
try {
//step1. Obtain the status information of each node instance in the rabbitmq cluster
Map<String, ClusterStatus> clusterMap =
fetchRabbtMQClusterStatus(RABBIT_NODES_STATUS_REST_URL, RABBIT_USER_NAME, RABBIT_USER_PWD);
//step2. Print the status information of each node instance
for (Map.Entry entry : clusterMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
//step3. Obtain the rabbitmq cluster user information
Map<String, User> userMap =
fetchRabbtMQUsers(RABBIT_USERS_REST_URL, RABBIT_USER_NAME, RABBIT_USER_PWD);
//step4. Print rabbitmq cluster user information
for (Map.Entry entry : userMap.entrySet()) {
System.out.println(entry.getKey() + ":"+ entry.getValue()); }}catch(IOException e) { e.printStackTrace(); }}public static Map<String, ClusterStatus> fetchRabbtMQClusterStatus(String url, String username, String password) throws IOException {
Map<String, ClusterStatus> clusterStatusMap = new HashMap<String, ClusterStatus>();
String nodeData = getData(url, username, password);
JsonNode jsonNode = null;
try {
jsonNode = JsonUtil.toJsonNode(nodeData);
} catch (IOException e) {
e.printStackTrace();
}
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
JsonNode next = iterator.next();
ClusterStatus status = new ClusterStatus();
status.setDiskFree(next.get("disk_free").asLong());
status.setFdUsed(next.get("fd_used").asLong());
status.setMemoryUsed(next.get("mem_used").asLong());
status.setProcUsed(next.get("proc_used").asLong());
status.setSocketUsed(next.get("sockets_used").asLong());
clusterStatusMap.put(next.get("name").asText(), status);
}
return clusterStatusMap;
}
public static Map<String, User> fetchRabbtMQUsers(String url, String username, String password) throws IOException {
Map<String, User> userMap = new HashMap<String, User>();
String nodeData = getData(url, username, password);
JsonNode jsonNode = null;
try {
jsonNode = JsonUtil.toJsonNode(nodeData);
} catch (IOException e) {
e.printStackTrace();
}
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
JsonNode next = iterator.next();
User user = new User();
user.setName(next.get("name").asText());
user.setTags(next.get("tags").asText());
userMap.put(next.get("name").asText(), user);
}
return userMap;
}
public static String getData(String url, String username, String password) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(username, password);
HttpGet httpGet = new HttpGet(url);
httpGet.addHeader(BasicScheme.authenticate(creds, "UTF-8".false));
httpGet.setHeader("Content-Type"."application/json");
CloseableHttpResponse response = httpClient.execute(httpGet);
try {
if(response.getStatusLine().getStatusCode() ! =200) {
System.out.println("call http api to get rabbitmq data return code: " + response.getStatusLine().getStatusCode() + ", url: " + url);
}
HttpEntity entity = response.getEntity();
if(entity ! =null) {
returnEntityUtils.toString(entity); }}finally {
response.close();
}
return null;
}
public static class JsonUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
//objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
}
public static JsonNode toJsonNode(String jsonString) throws IOException {
returnobjectMapper.readTree(jsonString); }}public static class User {
private String name;
private String tags;
@Override
public String toString(a) {
return "User{" +
"name=" + name +
", tags=" + tags +
'} ';
}
// The GET/SET method is omitted
public String getName(a) {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTags(a) {
return tags;
}
public void setTags(String tags) {
this.tags = tags; }}public static class ClusterStatus {
private long diskFree;
private long diskLimit;
private long fdUsed;
private long fdTotal;
private long socketUsed;
private long socketTotal;
private long memoryUsed;
private long memoryLimit;
private long procUsed;
private long procTotal;
// The Getter and Setter methods are omitted
public long getDiskFree(a) {
return diskFree;
}
public void setDiskFree(long diskFree) {
this.diskFree = diskFree;
}
public long getDiskLimit(a) {
return diskLimit;
}
public void setDiskLimit(long diskLimit) {
this.diskLimit = diskLimit;
}
public long getFdUsed(a) {
return fdUsed;
}
public void setFdUsed(long fdUsed) {
this.fdUsed = fdUsed;
}
public long getFdTotal(a) {
return fdTotal;
}
public void setFdTotal(long fdTotal) {
this.fdTotal = fdTotal;
}
public long getSocketUsed(a) {
return socketUsed;
}
public void setSocketUsed(long socketUsed) {
this.socketUsed = socketUsed;
}
public long getSocketTotal(a) {
return socketTotal;
}
public void setSocketTotal(long socketTotal) {
this.socketTotal = socketTotal;
}
public long getMemoryUsed(a) {
return memoryUsed;
}
public void setMemoryUsed(long memoryUsed) {
this.memoryUsed = memoryUsed;
}
public long getMemoryLimit(a) {
return memoryLimit;
}
public void setMemoryLimit(long memoryLimit) {
this.memoryLimit = memoryLimit;
}
public long getProcUsed(a) {
return procUsed;
}
public void setProcUsed(long procUsed) {
this.procUsed = procUsed;
}
public long getProcTotal(a) {
return procTotal;
}
public void setProcTotal(long procTotal) {
this.procTotal = procTotal;
}
@Override
public String toString(a) {
return "ClusterStatus{" +
"diskFree=" + diskFree +
", diskLimit=" + diskLimit +
", fdUsed=" + fdUsed +
", fdTotal=" + fdTotal +
", socketUsed=" + socketUsed +
", socketTotal=" + socketTotal +
", memoryUsed=" + memoryUsed +
", memoryLimit=" + memoryLimit +
", procUsed=" + procUsed +
", procTotal=" + procTotal +
'} '; }}}Copy the code
Zabbix monitoring RabbitMQ
Zabbix is an enterprise-level open source solution for distributed system monitoring and network monitoring based on a WEB interface.
He can also help us build a monitoring system for MQ clusters and provide early warning and other functions,
However, due to its high construction and configuration requirements, it is usually built by operation and maintenance personnel.
If you are interested, you can visit www.zabbix.com/.
Analysis of interview questions
1. Why does Rabbitmq need channels and not TCP direct communication
1, TCP creation and destruction, large overhead, to create three handshake, destroy to four times break up.
2. Without channels, the application would TCP connect to the Rabbit server, which would be a huge waste of resources at peak times with tens of thousands of connections per second, and the underlying operating system would be limited in the number of TCP connections per second, creating a performance bottleneck.
3, channel principle is a thread a channel, multiple threads multiple channels with a TCP connection, a TCP connection can accommodate unlimited channel, even thousands of requests per second will not become a performance bottleneck.
2: Queue is created by consumer or producer?
1: The rabbitMQ operation panel is recommended. This is a safe course of action.
2. Generally speaking, it is best to create information on the consumer side and consume information on the consumer side.
So you suffer a consequence, maybe I’m producing a message and MAYBE I’m losing a message.
3: it is also possible to create a queue in the producer, so that the message will not be lost.
4: If you have a queue created by both producers and consumers, the queue that starts first overwrites the queue that starts first