Initial use of Rabbitmq
With the development of the concept of micro-services, large applications are gradually divided into small applications to improve the development efficiency. Specialized people do specialized things and gradually become popular.
Most of the communication methods on microservices are RPC, and there are also upgraded GRPC.
Another implementation is to use MQ for decoupling.
Today is the beginning of MQ, a quick start, prepare an environment implementation case, the article covers the following:
- Install the rabbitmq
- Problems that MQ can solve
- We practice
The installation
We use docker to install RabbitMQ, docker is convenient for us to quickly implement rabbitMQ installation, do not need to install MQ headache.
Docker two ways
Docker way
// Start rabbitmq docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672-v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9Copy the code
Description:
- -d Background running container;
- –name Specifies the container name.
- -p Specifies the port on which the service runs (5672: application access port; 15672: Console Web port number);
- -v Mapping directory or file.
- (one important thing about RabbitMQ is that it stores data according to what is called a “node name”, which is the hostname by default);
- -e Specifies the environment variable. RABBITMQ_DEFAULT_VHOST: specifies the default VM name. RABBITMQ_DEFAULT_USER: default user name. RABBITMQ_DEFAULT_PASS: password for default user name)
Docker – compose way
version: "3"
services:
rabbit:
image: docker.infervision.com/library/rabbitmq:3-management
ports:
- "4369:4369"
- "5671:5671"
- "5672:5672"
- "15671:15671"
- "15672:15672"
restart: always
environment:
- RABBITMQ_DEFAULT_USER=test
- RABBITMQ_DEFAULT_PASS=test
volumes:
- /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq
container_name: rabbitmq
Execute: docker-compose in this file directory up -d
Copy the code
Download rabbitMQ built-in admin, IP :15672 Username and password we wrote on startup.
What does MQ solve?
In layman’s terms, MQ is primarily used to solve the following three problems.
Asynchronous messaging
In business, it is common to encounter the service of sending email, SMS or other notification content at the same time. At the beginning of the service, the data is sent back to the client after the data is sent in synchronous or asynchronous mode. There is a delay
As the business grows, this approach wastes a lot of system performance. The message queue is used to decouple these services, and only the message content is sent to the message queue, reducing the waiting time of users, and the experience is much better than the original.
Decoupling between applications
The same service may require the cooperation of other services to complete a business operation. Or take the common shopping case to illustrate.
After jingdong places an order and pays, the message will be notified to the merchants, and the users will be notified by email that they have purchased a certain product.
If both operations are performed synchronously, the user waits longer.
After using MQ, the order system persists the message to MQ and returns the successful order to the user.
- Merchants receive the ordering information of users and process it. If there is inventory management, inventory processing is needed.
- Notifies the user of the successful order by email.
Mq ensures reliable message delivery, preventing message loss and ensuring high message reliability. If the inventory fails, it will not lead to the failure of users to place orders, and can be re-delivered.
Traffic peak clipping
Traffic peak clipping, generally at the same time in many requests, the background can not process. Then we need to use peak cutting to deal with it.
To put it simply, it receives instantaneous flow peak through a queue and smoothly pushes the message out at the consumer end. If the consumer does not consume in time, the message content can be persisted in the queue and the message will not be lost.
- If the consumer end does not timely consume, it can also dynamically expand the number of consumers and improve the consumption speed.
- Set relevant thresholds. Discard redundant messages and inform users of service messages such as seconds kill failure.
Practical cases
This article is carried out in accordance with the Java language, using Spring Boot to build, package management tool Gradle.
Import the RabbitMQ JAR package
compile("Org. Springframework. The boot: spring - the boot - starter - closer: 1.5.10. RELEASE")
Copy the code
Configuring mq
Yaml file configuration
spring:
rabbitmq:
host: 192.168110.. 5
port: 5672
username: tuixiang
password: tuixiang
Copy the code
Prepare the template class for immediate use later
package com.infervision.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/ * * *@author: fruiqi
* @date: 19-2-18 2:42 PM *@version:1.0 Rabbit Configuration **/
@Configuration
public class RabbitConfig {
/** * log **/
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.username}")
String userName;
@Value("${spring.rabbitmq.password}")
String userPassword;
@Value("${spring.rabbitmq.host}")
String host;
@Value("${spring.rabbitmq.port}")
Integer port;
/** * inject **@param
* @return com.rabbitmq.client.Connection
* @author fruiqi
* @date19-1-22 5:41 PM **/
@Bean
public ConnectionFactory getConnection(a) throws Exception {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(userName);
factory.setPassword(userPassword);
factory.setHost(host);
factory.setPort(port);
return factory;
}
/** * create the specified listener container **@paramQueueName Specifies the name of the listening queue *@paramListenerChannel sets whether to expose the listening channel to registered *@paramPrefetchCount tells the agent how many messages to request at a time *@paramConcurrentConsumers specifies how many ConcurrentConsumers are created *@paramAcknowledgeMode Message acknowledgment mode *@paramListener Listener *@return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
**/
public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel,
int PrefetchCount, int ConcurrentConsumers,
AcknowledgeMode acknowledgeMode,
ChannelAwareMessageListener listener) throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());
container.setQueueNames(queueName);
container.setExposeListenerChannel(listenerChannel);
container.setPrefetchCount(PrefetchCount);
container.setConcurrentConsumers(ConcurrentConsumers);
container.setAcknowledgeMode(acknowledgeMode);
container.setMessageListener(listener);
returncontainer; }}package com.infervision.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/ * * *@author: fruiqi
* @date: 19-2-18 2:51 PM *@version: 1.0 * * /
@Component
public class MsgSender {
private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/ * * *@paramExchange Switch name *@paramRoutingKey Route name *@paramMessage Message content *@return void
* @description//TODO sends messages to the message queue **/
public void sendMsg(String exchange, String routingKey, Object message) {
try {
rabbitTemplate.convertAndSend(exchange,routingKey,message);
}catch (Exception e){
logger.error("[ERROR] send statistic message error ",e); }}}Copy the code
Instance link MQ
Sometimes you need to create a queue from the rabbitMQ client, but sometimes you don’t. Create a queue from the RabbitMQ page and other consumers will reference it directly.
The client creates MQ
// Initialize the queue. If the queue already exists, nothing is done
@Bean
public Queue dicomQueue(a) {
return new Queue(getMacPreStr(DICOM_QUEUE_NAME));
}
// Initialize the switch
@Bean
public Exchange topicExchange(a) {
return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();
}
// Bind queues to switches according to routing rules
@Bean
Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);
}
Copy the code
use
The use of queues: one is to send, belonging to the producer; One is monitoring, which belongs to consumers.
Producer implementation
In the MQ configuration template class, a special send class is implemented to send the file content and directly invoke the send interface.
@Autowired
RabbitService rabbitService;
/** * practice sending data to MQ * 1. Send data to MQ * 2. The listener is configured to consume messages * 3. For client configuration see RabbitClientConfig *@paramName Indicates the name number *@paramVo physical content *@return: com.infervision.model.NameVo
*/
@ApiOperation(value = "Add name information", notes = "Physical Information")
@PostMapping(value = "/{name}")
@ApiImplicitParam(paramType = "query", name = "name", value = "User name", required = true, dataType = "string")
public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {
rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));
return vo;
}
@Service
public class RabbitServiceImpl implements RabbitService {
@Autowired
MsgSender msgSender;
/** * Try to send message to MQ *@param message
* @return: void
*/
@Override
public void sendMessage(String exchange, String routingKey,String message) { msgSender.sendMsg(exchange, routingKey, message); }}Copy the code
Consumer realization
Consumers realize there are two ways, a monitor, by means of annotation is a kind of implement ChannelAwareMessageListener classes to implement the consumption.
Annotation implementation listening
// Inject on the method. Configuration factories help increase the number of messages consumed by a single consumer at a time, and how many consumers are set to improve application performance
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")
public void processDicomMessage(Message message, Channel channel) {
logger.info(message);
}
// Factories can be configured in the configuration template class.
@Bean("multipleConsumerContainerFactory")
public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(50);
factory.setConcurrentConsumers(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
Copy the code
Implement interface mode
/** * Create listener. *@author fruiqi
* @date19-2-11 4:18 PM *@paramLabelStatisticsListener listener * calls our common method **/
@Bean
public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception {SimpleMessageListenerContainer container = rabbitConfig. SetSimpleMessageListenerContainer (" queue_name ",true, rabbitProperties.getMaximumDelivery(),
rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener);
return container;
}
@Component
public class LabelStatisticsListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class);
/** * process the data transferred *@paramMessage Indicates the content of the message to be sent@paramChannel implements channel *@return: void
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String mes = new String(message.getBody());
logger.info("[INFO] message is {}",mes);
// The manual reply message was consumed
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }}Copy the code
conclusion
This completes the rabbitMQ process from setup to use. Of course, there is more to explore, such as mq queuing patterns, multiple MQ configurations on a system, and so on. Stay tuned for our next installment in the MQ series.
Have you used MQ on your system? What kind of MQ do you use? We can discuss it in the comments section.
Code stored in: Github
, END,
Though the road is long, the journey is sure to come
This article was originally posted on the wechat public account of the same name “Pangqi’s upgrading road”, reply to “1024”, you know, give a thumbs up.
YoungRUIQ