This article provides a brief introduction to the design and implementation of rockTMQ-Spring-Boot. You can learn the development details of integrating the RocketMQ Client into the Spring-boot-starter framework. We then walk through a simple step-by-step example of how to configure, send, and consume RocketMQ messages using the Spring-boot-Starter toolkit.
About the author: Liao Tian, alibaba technical expert, Apache RocketMQ kernel controller, has years of experience in distributed system research and development, has a deep understanding of Microservice, Messaging, Storage and other fields. Currently, we focus on RocketMQ kernel optimization and Messaging ecological construction.
Through this article, you will learn:
-
An introduction to Spring’s messaging framework
-
Rocketmq-spring-boot implementation
-
Use the sample
This Saturday afternoon, Apache RocketMQ Developer Salon will come to Hangzhou, welcome to the event, please click “read the original article”.
preface
In the late 1990s, with the advent of Java EE(Enterprise Edition), and in particular Enterprise Java Beans, the use of complex descriptor configurations and inflexible code implementations increased the learning curve and development costs for developers. Spring technology, based on simple XML configuration and Plain Old Java Objects, Technologies such as Dependency Injection, Inversion of Control, and AOP provide a more agile solution to the shortcomings of traditional Java enterprises and versions.
As Spring continues to evolve, annotation-based configuration gradually replaces XML file configuration. On April 1, 2014, Spring Boot 1.0.0 was released. It is based on the concept of “Convention over Configuration” to quickly develop, test, run, and deploy Spring applications, and can be easily combined with various initiators such as spring-boot-web-starter, Let the application run directly from the command line, without deploying to a separate container. This simple, direct, and rapid process for building and developing applications, which can use conventional configurations and simplify deployment, has been welcomed by more and more developers.
Apache RocketMQ is a distributed message and flow middleware. It consists of two parts: Broker server and client.
The Producer is responsible for sending messages to the Broker server.
Another is the message Consumer client, where multiple consumers can form a Consumer group to subscribe to and pull messages stored on the consuming Broker server.
To take advantage of the rapid development of Spring Boot and to give users more flexibility with the RocketMQ messaging client, the Apache RocketMQ community has introduced the Spring-Boot-Starter implementation. With the release of distributed transaction messaging in RocketMQ version 4.3.0, the associated Spring-Boot code was recently updated to annotate back to distributed transactions and send transaction messages.
This article provides a brief introduction to the current design implementation and provides a detailed overview of the development of integrating the RocketMQ Client into the Spring-boot-starter framework. We then walk through a simple step-by-step example of how to configure, send, and consume RocketMQ messages using the Spring-boot-Starter toolkit.
The messaging framework in Spring
By the way, I’ll discuss the two main frameworks for Messaging in Spring, Spring Messaging and Spring Cloud Stream. Both can be integrated with Spring Boot and provide some reference implementations. Like all implementation frameworks, messaging framework aims to achieve lightweight message-driven microservices, which can effectively simplify the complexity of using messaging middleware for developers and allow system developers to focus more on the processing of core business logic.
2.1 Spring Messaging
Spring Messaging is a module added in Spring Framework 4, which is an extended support for Spring and Messaging system integration. It implements a complete infrastructure from simple using JMS interfaces based on JmsTemplate to receiving messages asynchronously, and Spring AMQP provides a similar set of capabilities required by the protocol. After integration with Spring Boot, it has automatic configuration capabilities that integrate with appropriate messaging systems at test and run time.
For clients only, Spring Messaging provides a set of abstract APIS or convention standards, which specify the mode of message sender and message receiver. Different message middleware providers can provide their own Spring implementation under this mode: What needs to be implemented in the message sending side is a Java Bean in the form of XXXTemplate, combined with Spring Boot automatic configuration options to provide a number of different message sending methods; On the consuming side of the message is an XXXMessageListener interface (typically implemented using an annotation to declare a message-driven POJO) that provides callback methods to listen for and consume messages, also using Spring Boot automation options and some custom properties.
This document is recommended if you are interested in learning more about Spring Messaging and its use for different Messaging products. Referring to the existing implementation of Spring Messaging, RocketMQ’s Spring-boot-starter follows relevant design patterns and provides corresponding apis (e.g., sequential, asynchronous, and transactional half-messaging) in combination with RocketMQ’s own features.
2.2 Spring Cloud Stream
Spring Cloud Stream combines the annotations and features of Spring Integration, and its application model is as follows:
An independent application kernel is provided in the Spring Cloud Stream framework. It communicates with the outside world through the @input and @output channels. The message Source sends messages through the Input channel. The consumption target (Sink) listens on the output channel to get the consumption message. These channels are connected to external agents with dedicated binders. Developers’ code only needs to be programmed against the fixed interfaces and annotations provided by the application kernel, not the binder-bound messaging middleware of the runtime. At runtime, Spring Cloud Stream automatically probes and uses binders found under the CLASspath.
This allows developers to easily use different types of middleware in the same code: they just need to include different binders at build time. In more complex usage scenarios, you can also package multiple binders in an application and let it choose its own, or even use different binders for different channels at run time.
The Binder abstraction enables Spring Cloud Stream applications to be flexibly connected to middleware. In addition, Spring Cloud Stream uses the flexible configuration capabilities of Spring Boot. This configuration can be provided through externally configured properties and any form supported by Spring Boo (including application launch parameters, environment variables, and application.yml or application.properties files), The deploayer can dynamically select channel connection destinations (for example, Kafka’s Topic or RabbitMQ’s Exchange) at runtime.
Binder SPI allows messaging middleware products to use extensible apis to write corresponding binders and integrate them into the Spring Cloud Steam environment. Currently, RocketMQ does not provide relevant binders. We plan to improve this feature in the next step. I also hope that students in the community who have experience in this field will actively try to contribute PR or suggestions.
Spring – the boot – the realization of the starter
As we already know from the beginning, the starter constructed by Spring Boot starter is very convenient for the user. As long as the user introduces the starter dependency definition in pm.xml, the corresponding compilation, run, and deployment functions are automatically introduced. Therefore, common open source components will provide a Spring-boot-starter package for Spring users to developers, making it very easy for developers to integrate and use. Here we will introduce the RocketMQ (client) starter implementation process in detail.
3.1. Implementation steps of spring-boot-starter
A spring-boot-starter implementation contains the following parts:
- Definition in POM.xml
- Define the information for the starter component to be eventually generated
<groupId>org.apache.rocketmq</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> < version > 1.0.0 - the SNAPSHOT < / version >Copy the code
- Define dependency packages,
It is divided into two parts: A, Spring’s own dependency package; B. RocketMQ dependencies
<dependencies> <! -- spring-boot-start internal depdencies --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <! -- rocketmq dependencies --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq-version}</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <! -- spring-boot-start parent depdency definition --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>Copy the code
- Configuration file class
Define the application properties profile class RocketMQProperties, which defines a default set of property values. When users in the use of the starter, can according to attribute value to modify the class definition, of course not directly modify this type of configuration, but the application of spring – the boot the corresponding configuration file: SRC/main/resources/application. The properties.
- Define auto-loading classes
Auto-loading classes in SRC /resources/ meta-INF /spring.factories are defined to enable Spring Boot to automatically initialize associated beans, components, or services with the automatic configuration classes specified in this article. It reads as follows:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration
Copy the code
In the concrete implementation of RocketMQAutoConfiguration classes, open to the user directly define the Bean object. Include:
-
RocketMQProperties loads the processing class for the application properties profile;
-
RocketMQTemplate Send template class for sending messages by the sending user;
-
ListenerContainerConfiguration container Bean is responsible for the discovery and registration consumption end realize the interface class, the class requirements: by @ RocketMQMessageListener annotations; Implement the RocketMQListener generalization interface.
- Finally, rocketMQ-related encapsulation is performed on the producer and consumer clients respectively, and the current implementation version provides a compatible approach to the Spring Messaging interface.
3.2. Message sender implementation
- Common sender
The sender code is encapsulated in RocketMQTemplate POJO. The following is the call diagram of the sender code:
Spring in order to Messaging send template compatible in RocketMQTemplate integrates AbstractMessageSendingTemplate abstract class, to support related message transformation and sending method, these methods will eventually agent to doSend () method; DoSend () and RocoketMQ specific methods such as asynchronous, one-way, and sequential are added directly to RoketMQTempalte. These methods invoke the RocketMQ Producer API directly to send messages.
- Transaction message sender
For transaction message processing, the message sender is partially extended, as shown in the following figure:
RocketMQTemplate adds a method that sends a transaction message, sendMessageInTransaction(), which is called by RocketMQ’s TransactionProducer. The Producer registers its associated TransactionListener implementation class so that the method implementation in The TransactionListener can be called after the message is sent.
3.3. Message consuming side implementation
When the spring-Boot application starts on the consumer side, it scans all classes that contain the @RocketMQMessagelistener annotation (these classes need to integrate the RocketMQListener interface and implement the onMessage() method), The Listener will be one-on-one are placed in DefaultRocketMQListenerContainer container object, a container object will according to the consumption mode (concurrent or order), Encapsulate RocketMQListener into a specific concurrent or sequential interface implementation within RocketMQ. Create a RocketMQ Consumer object in the container, start and listen for custom Topic messages, and if there are any Consumer messages, call back to the Listener’s onMessage() method.
Use the sample
The previous chapter introduced the implementation of RocketMQ in spring-boot-starter mode. Here is a simple example of sending and consuming messages to show how to use rocketMq-Spring-boot-starter.
4.1 Preparations for the RocketMQ Server
- Start NameServer and Broker
To verify RocketMQ’s Spring-boot client, first ensure that the RocketMQ service is correctly downloaded and started. See the RocketMQ Main Site’s Quick Start to do this. Make sure that starting NameServer and Broker are started correctly.
- Create the Topics needed in the instance
Perform the following command line operations in the directory where the startup command is executed
bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic
Copy the code
4.2 compile rocketmq – spring – the boot – the starter
Currently, spring-boot-starter relies on Maven’s central repository that has not yet been submitted. Users need to download git source code before using it, and then run MVN Clean Install to install it to their local repository.
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install
Copy the code
4.3. Write client code
To use it, users need to add the following dependencies to the Maven configuration file pom.xml for the publishing and consuming clients:
< properties > < spring - the boot - starter - rocketmq version - > 1.0.0 - the SNAPSHOT < / spring - the boot - starter - rocketmq - version > < / properties > <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> <version>${spring-boot-starter-rocketmq-version}</version>
</dependency>
Copy the code
The value of the spring-boot-starter-rocketmq-version attribute is 1.0.0-snapshot, which is the same as the version installed to the local repository in the previous step.
- Message sender code
The sending configuration file application.properties
# define name-server address
spring.rocketmq.name-server=localhost:9876
# define the publisher group name
spring.rocketmq.producer.group=my-group1
# Define the topic to send
spring.rocketmq.topic=string-topic
Copy the code
Java code on the sending side
import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; . @springBootApplication public class ProducerApplication implements CommandLineRunner {// Declare and reference RocketMQTemplate @Resource private RocketMQTemplate rocketMQTemplate; // Use the topic attribute @value (defined in application.properties)."${spring.rocketmq.springTopic}") private String springTopic; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... The args) throws the Exception {/ / synchronously send string message to the specified topic SendResult SendResult = rocketMQTemplate. SyncSend (springTopic,"Hello, World!"); System.out.printf(system.out.printf ("string-topic syncSend1 sendResult=%s %n", sendResult); }}Copy the code
- Message consumer code
The consumption-side configuration file application.properties
# define name-server address
spring.rocketmq.name-server=localhost:9876
# define the publisher group name
spring.rocketmq.consumer.group=my-customer-group1
# Define the topic to send
spring.rocketmq.topic=string-topic
Copy the code
Java code on the consumer side
@SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }} // Declare the class that consumes the message, and specify the relevant consumption information in the annotation @service @RocketmqMessagelistener (topic ="${spring.rocketmq.topic}", consumerGroup = "${spring.rocketmq.consumer.group}")
class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s %f", message); }}Copy the code
This is a brief introduction to using Spring-Boot to write the most basic message sending and receiving code. If you need to learn more about the call methods, such as: asynchronous send, object message body, specify the tag tag and specify the transaction message, please refer to github documentation and detailed code. We will continue to cover these advanced features.
Foreboding: A second article on the use of message transactions in the SpringBoot framework is coming soon.
Reference documentation
1.Spring Boot features-Messaging
2.Enterprise Integration Pattern- Composition introduction
3.Spring Cloud Stream Reference Guide
4. dzone.com/articles/cr…
- End –
Middleware little sister in the operation of the weibo number
Come and pay attention
© One push per week
The first time to get the next share
☟ ☟ ☟