In the process of project development, we often encounter similar business scenarios: users apply for cash withdrawal, the background carries out accounting processing, sends cash withdrawal SMS, and calls the bank payment channel.

In this process, it is time-consuming to call the three-party channel (SMS or bank channel), and the accounting processing may also be handled by a special accounting system. So, to improve concurrency and speed, the next three operations can be handled asynchronously. This is where message queues are used.

Message queue middleware is an important component in distributed system. It mainly solves application coupling, asynchronous message, traffic cutting and other problems, and realizes high performance, high availability, scalability and final consistency architecture. It is an indispensable middleware in large distributed system.

The most common message queues in the market are ActiveMQ, RabbitMQ, ZeroMQ, Kafka, MetaMQ and RocketMQ.

ActiveMQ is specially integrated in the Starter of Spring Boot. Therefore, in this article, we will talk about the integration of ActiveMQ.

The JMS specification

JMS, the Java Message Service (Java Message Service) application Interface, is an API for message-oriented middleware (MOM) in the Java platform. It is used to send messages and communicate asynchronously between two applications or in distributed systems. The Java messaging service is a platform-independent API, and JMS is supported by most MOM providers.

JMS messaging mechanism has two models, one is in the form of queue (Point to Point -) sent messages can only be consumed by a single consumer; One is the subscription (Topic) pattern, which can be subscribed by multiple subscribers, all of whom receive the same message.

ActiveMQ is one of the implementations of JMS.

ActiveMQ is introduced

ActiveMQ is an open source implementation of Message middleware based on JMS (Java Message Servie) specification. The design goal of ActiveMQ is to provide a standard, message-oriented application integration Message communication middleware that can span multiple languages and systems.

It provides high availability, high performance, scalability, stability, and security for messaging in enterprise applications.

ActiveMQ implements the JMS specification and provides a number of additional features on top of it. ActiveMQ supports both queue and subscribe modes of message sending.

The data transmission process of AcitveMQ is shown as follows:

ActiveMQ has two types of messaging:

(1) Point-to-point transmission, that is, one producer corresponds to one consumer. The producer pushes data to Broke, and the data is stored in a queue in Broke. When the consumer accepts the data in the queue, the data is stored in the queue.

(2) Transmission based on publish/subscribe mode, that is, according to the subscription topic to receive the corresponding data, a producer can push data to multiple consumers, and the implementation of MQTT protocol is similar.

The difference between the two types of messaging is that the point-to-point transmission consumer can receive the data pushed by the producer before the connection, while the publish/subscribe transmission consumer can only receive the data pushed by the producer after the connection.

Spring Boot integrates ActiveMQ

Spring Boot specifically provides Spring-boot-starter-ActivemQ for ActiveMQ to support automatic integration configuration of ActiveMQ in Spring Boot. On this basis we can easily integrate and use.

Create projects and introduce dependencies

Create a standard Spring Boot project and introduce the following dependencies into the project:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
Copy the code

At this point, if no Web or other related processing is required, just import the dependency. If pool is used, the following dependencies need to be added to the POM:

<dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-pool</artifactId>
</dependency>
Copy the code

The configuration file

Add the following configuration to application.properties:

# Memory based ActiveMQ spring. ActiveMQ. In-memory =true If you use the connection pool also need to be added in the pom activemq - pool depends on the spring. Activemq. Pool. Enabled = false # independent installation of activemq # spring. Activemq. Broker - url = TCP: / / 127.0.0.1:61616 # spring. Activemq. User = admin # spring. The activemq. Password = adminCopy the code

There are two configurations in the above configuration, Spring Boot supports ActiveMQ based on memory and ActiveMQ based on independent installation. Normal requests are made in memory based form for easy testing purposes, while stand-alone installation-based form is really used in production environments. In order to explain the function and facilitate testing, memory – based form is used here.

Queue pattern instance

First, let’s implement a queue-based implementation. The two classes ActiveMQQueue and JmsMessagingTemplate are needed. The former is implemented by ActiveMQ’s interface to Javax.jms.queue. The latter provides Spring with a tool class for sending messages, combined with a Queue.

The JmsMessagingTemplate is instantiated by default and can be used directly. ActiveMQQueue requires us to instantiate and pass in the name of the message queue.

@Configuration public class MyMqConfig { @Bean public Queue queue() { return new ActiveMQQueue("sms.queue"); }}Copy the code

Spring Boot is a fairly routine instantiation operation that I won’t go over again. After instantiating the ActiveMQQueue, our queue is created. Now create the corresponding producer and consumer.

The producer code is as follows:

@Component public class Producer { @Resource private JmsMessagingTemplate jmsMessagingTemplate; @Resource private Queue queue; Public void sendMsg(String MSG) {system.out.println (" send message :" + MSG); this.jmsMessagingTemplate.convertAndSend(this.queue, msg); }}Copy the code

Here we use JmsMessagingTemplate and Queue, both of which have been initialized, as mentioned above. The configuration for consumers is as follows:

@Component public class Consumer { @JmsListener(destination = "sms.queue") public void receiveMsg(String text) { System.out.println(" Received message: "+text); }}Copy the code

Spring provides an annotated listener endpoint: use @jmsListener. Subscribe to the managed bean using the annotated method of @jMSListener. In Java8, @jmslistener is a repeatable annotation that can associate multiple JMS destinations to the same method. In Java 6 and 7, annotations can be made using @jmsListeners.

Destination specifies the monitored message queue name as sms.queue. This method is triggered when a message is sent in the sms.queue, with text being the message content.

After the queue initialization and producer and consumer code is written, unit tests are performed to verify that messages can be sent and processed correctly.

@RunWith(SpringRunner.class) @SpringBootTest public class ActiveMqTests { @Autowired private Producer producer; @test public void sendSimpleQueueMessage() {this.producer. SendMsg (" 200.00 yuan "); }}Copy the code

When the unit test is executed, the following information is printed in the log:

Send message content: Withdraw 200.00 Yuan Received message: Withdraw 200.00 yuanCopy the code

Note Messages can be sent and received normally. If it is based on memory mode, the exception log “javax.jms.jmsexception: peer (vm://localhost#1) stopped.” is printed when the unit test is executed. This is an Info level error and a bug in ActiveMQ.

Subscription pattern instance

Broadcast messages can be received by multiple consumers. Here we are on the basis of the original broadcast message to add.

First of all, when Spring Boot integrates ActiveMQ, only one of the queues or broadcasts is supported by default, which is specified by the configuration item Spring.jms.pub-sub-domain. True is broadcast mode, false is queue mode, and queue mode is supported by default.

To use the broadcast mode, add the following configuration to the configuration file:

spring.jms.pub-sub-domain=true
Copy the code

Note that the queue mode is not working properly at this point.

Then add to MyMqConfig:

@Bean
public Topic topic() {
	return new ActiveMQTopic("sms.topic");
}
Copy the code

Here ActiveMQTopic is created and the name of the topic is specified as SMS.topic.

The following code is added to Producer:

@Resource private Topic topic; Public void sendTopic(String MSG) {system.out.println (" sendTopic content :"+ MSG); this.jmsMessagingTemplate.convertAndSend(this.topic, msg); }Copy the code

To demonstrate multiple broadcast receivers, two new consumers are added to Comsumer:

@JmsListener(destination = "sms.topic") public void receiveTopic1(String text) { Println ("receiveTopic1 Received Topic message: "+ text); } @JmsListener(destination = "sms.topic") public void receiveTopic2(String text) { Println ("receiveTopic2 Received Topic message: "+ text); }Copy the code

Add the following tests to the unit test class:

@test public void sendSimpleTopicMessage() {this.producer.sendTopic(" 200.00 yuan "); }Copy the code

When the unit test is executed, the following log information is displayed:

Topic message content: Withdraw 200.00 Yuan receiveTopic2 Received Topic: Withdraw 200.00 Yuan receiveTopic1 received Topic: Withdraw 200.00 yuanCopy the code

The message is successfully sent.

Both forms are supported

In the example above, either support queue model or support broadcasting mode, if both need to support in production environment, you will need to custom JmsListenerContainerFactory instance. Of course, you can customize this class if the default Configuration of Spring Boot does not meet your requirements, and this is just one of the scenarios.

Basic configuration and use steps: Create custom through DefaultJmsListenerContainerFactory JmsListenerContainerFactory instance, in @ JmsListener annotations by containerFactory attribute references.

Add the following configuration to the MyMqConfig configuration class:

@Bean("queueListenerFactory") public JmsListenerContainerFactory<? > queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } @Bean("topicListenerFactory") public JmsListenerContainerFactory<? > topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // Set it to publish and subscribe, and by default use the production consumer factory.setpubsubDomain (true); return factory; }Copy the code

The queue-based and subscription-based factory classes are instantiated, respectively. Then add the containerFactory property to the corresponding consumer methods. Example code is as follows:

@JmsListener(destination = "sms.queue", ContainerFactory = "queueListenerFactory") public void receiveMsg(String text) {system.out.println (" Received message: "+ text); } @JmsListener(destination = "sms.topic", containerFactory = "topicListenerFactory") public void receiveTopic1(String text) { Println ("receiveTopic1 Received Topic message: "+ text); }Copy the code

Execute both forms of message separately and find that both are normal and mutually beneficial. At the same time, the item spring.jms.pub-sub-domain in the configuration file is invalid.

Other matters

1. Port number of activeMq is 61616;

Pub -sub-domain=true; spring.jms.pub-sub-domain=true;

Queue if there are no consumers, the information will be stored in the queue;

4. Serialize the object when sending a message; When consumers receive object information, they need to use ObjectMessage for transformation.

The JmsListener annotation contains the containerFactory property, which can be configured to receive queque and topic at the same time.

Queue is point-to-point mode. Tipic is a publish and subscribe model;

7. The message queue names (sms.queue and sms.Topic) in the example can be set as configuration properties as needed;

Source code address: github.com/secbr/sprin…

Refer to the article: www.cnblogs.com/xiguadadage… Blog.csdn.net/bihansheng2…

The original article was first published in the public number: program new vision, welcome to pay attention to.