The author | RocketMQ officer micro source | alibaba cloud native public number
Rocketmq-spring graduated in January 2019 after six months of incubation as a subproject of Apache RocketMQ, with its first Release 2.0.1. This project encapsulates the RocketMQ client using Spring Boot, allowing users to send and consume messages using simple annotations and standard Spring Messaging API code. At that time, the RocketMQ community asked the Spring community to review the RocketMQ-Spring code, which led to a story about RocketMQ and Spring Boot.
Two years later, RocketMQ-Spring has officially released 2.2.0. During this time rocketMQ-Spring iterated through several versions, Spring Cloud Stream RocketMQ Binder Spring Cloud Bus RocketMQ Binder Spring evangelist Baeldung introduced to foreign students how to use RocketMQ-Spring. More and more students at home and abroad are using RocketMQ-Spring to send and receive messages. The RocketMQ-Spring repository has also surpassed spring-Kafka and Spring-AMQP (both maintained by the Spring community) in star count in just two years to become one of the most popular eco-projects for Apache RocketMQ.
Rocketmq-spring’s popularity is due to the perfect fit between RocketMQ, which supports rich business scenarios, and the Microservices ecosystem Spring, and its strict adherence to the Spring Messaging API specification. Support for rich message types.
Follow the Spring Messaging API specification
Spring Messaging provides a set of abstract apis that specify the pattern of message sender and message receiver. Different Messaging middleware providers can provide their own Spring implementations under this pattern: What needs to be implemented in the message sending side is a Java Bean in the form of XXXTemplate, combined with Spring Boot automatic configuration options to provide a number of different message sending methods; On the consuming side of the message is an XXXMessageListener interface (typically implemented using an annotation to declare a message-driven POJO) that provides callback methods to listen for and consume messages, also using Spring Boot automation options and some custom properties.
1. The sender
Rocketmq-spring provides an API based on the Spring Messaging API specification and RocketMQ’s own features. On the sending side of the message, RocketMQ-Spring implements the RocketMQTemplate to send the message. As shown in the figure below, RocketMQTemplate inherit AbstractMessageSendingTemplate abstract class, to support the Spring Messaging API standard message transformation and sending method, these methods will eventually agent to doSend method, The doSend method eventually calls syncSend, implemented by DefaultMQProducer.
In addition to the methods in the Spring Messaging API specification, RocketMQTemplate implements some of the RocketMQ native client methods to support richer message types. It is important to note that, rather than having to build RocketMQ Messages themselves (such as serializing objects into byte arrays into Message objects), RocketMQTemplate can directly send objects, strings, or byte arrays as parameters (object serialization is done by RocketMQ-Spring). The corresponding Schema can be agreed on the consumer side to send and receive normally.
RocketMQTemplate the Send API: SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message<? > message) void asyncSend(String destination, message <? > message, SendCallback sendCallback) void asyncSend(String destination, Message<? > message, SendCallback SendCallback)......Copy the code
2. The consumer end
On the consumer side, you need to implement a class with the @RocketMqMessagelistener annotation (you need to implement the RocketMQListener interface and implement the onMessage method to configure the topic, consumerGroup, and other attributes in the annotation). The Listener will be one-on-one are placed in DefaultRocketMQListenerContainer container object, a container object will according to the consumption mode (concurrent or order), Encapsulate RocketMQListener into a specific concurrent or sequential interface implementation within RocketMQ. Create a RocketMQ DefaultPushConsumer object in the container, start and listen for the custom Topic message, complete the conversion of the convention Schema object, and call back to the Listener’s onMessage method.
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}
Copy the code
In addition to the Push interface, rocketMQ-Spring implements the RocketMQ Lite Pull Consumer in the latest 2.2.0 release. By configuring consumer in the configuration file, you can actively Pull messages using the Recevie method of RocketMQTemplate.
The resource configuration file/application. The properties: Rocketmq. Name-server =localhost:9876 Rocketmq. consumer.group=my-group1 Rocketmq. consumer.topic=test Pull consumer code while(! isStop) { List<String> messages = rocketMQTemplate.receive(String.class); System.out.println(messages); }Copy the code
Rich message types
RocketMQ Spring message type support is fully aligned with RocketMQ native clients, including synchronous/asynchronous /one-way, sequential, delayed, batch, transactional, and Request-reply messages. Here, we focus on the more special transaction messages and Request-reply messages.
1. Transaction messages
RocketMQ transaction messages are different from transaction messages in Spring Messaging and still use the RocketMQ native transaction message scheme. As shown below, send transaction message needs to implement a class contains @ RocketMQTransactionListener annotations, and implement executeLocalTransaction and checkLocalTransaction method, To complete the execution of the local transaction and check the results of the local transaction execution.
// Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..) . ; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null); // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; }}Copy the code
In version 2.1.0, RocketMQ-Spring refactored the implementation of transaction messages, as shown below. In the old version, each group corresponds to a TransactionProducer. In the new version, there is one TransationProducer for each RocketMQTemplate, which addresses the problem of using multiple transaction messages concurrently. Being ExtRocketMQTemplate can be used when users need to use multiple transactional messages in a single process. (In general, one RocketMQTemplate is recommended for one process. Being ExtRocketMQTemplate can be used in scenarios where multiple producers/Litepullconsumers are used in the same process, It is possible to specify nameserver, group, etc. configurations for an ExtRocketMQTemplate that are different from the standard RocketMQTemplate), And the corresponding RocketMQTransactionListener annotations specified rocketMQTemplateBeanName ExtRocketMQTemplate BeanName.
2. Request – Reply message
In version 2.1.0, RocketMQ-Spring began supporting request-reply messages. A request-reply message is a message sent by the upstream service to be notified until the consumer returns the result and sends it back to the sender. In RocketMQ-Spring, the sender sends the RocketMQTemplate sendAndReceivce method, as shown below. There are two main methods: synchronous and asynchronous. In asynchronous by implementing RocketMQLocalRequestCallback callback.
/ / the synchronous request and wait for the return value of a type String String replyString = rocketMQTemplate. SendAndReceive (" stringRequestTopic ", "request string", String.class); / / the asynchronous request and wait for the User to type the return value of rocketMQTemplate. SendAndReceive (" objectRequestTopic ", new User("requestUserName",(byte) 9), New RocketMQLocalRequestCallback < User > () {@ Override public void onSuccess (User message) {... } @override public void onException(Throwable e) {...... }});Copy the code
On the consumer side, you still need to implement a class that contains the @RocketMQMessagelistener annotation, but the interface you need to implement is the RocketMQReplyListener<T, R> interface (normal messages are the RocketMQListener interface), Where T represents the type of received value and R represents the type of returned value. The interface needs to implement onMessage method with returned value, and the content of returned value is returned to the corresponding Producer.
@Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {@override public String onMessage(String message) {...... return "reply string"; }}Copy the code
Rocketmq-spring follows the concept of Spring Convention over Configuration, using the Spring Boot Starter to introduce a dependency (groupId: Rocketmq (org.apache. rocketMQ, artifactId: RocketMQ-spring-boot-starter) integrates all rocketMQ clients into Spring Boot and sends and receives messages using simple annotations. More detailed usage and FAQ are available in the RocketMQ-Spring Github Wiki.
Since its first official release, RocketMQ-Spring has completed 16 bug fixes, 37 imporvement, including transaction message refactoring, Welcome to the RocketMQ community, RocketMQ and Spring Boot continue to tell the story of RocketMQ. Nail nail search group number: 21982288, you can enter the group and many developers exchange!