preface
When using SpringBoot’s Starter integration package, pay special attention to the version. Because SpringBoot’s starter dependency for RocketMQ integration is provided by the Spring community, it is currently being rapidly iterated and the gap between versions is so great that even the underlying objects are constantly changing. For example, if you use rocketmq-spring-boot-starter:2.0.4, upgrading to the latest rocketmq-spring-boot-starter:2.1.1 will not work
The application structure
TestController: test the entrance, there is basic information and transaction message testing TopicListener: is to monitor “topic” the subject of ordinary message listener TopicTransactionListener: Is to monitor “topic” the subject transaction message listener, and TopicTransactionRocketMQTemplate binding (one-to-one correspondence relationship) Customer: Is a test message body entity object TopicTransactionRocketMQTemplate: RocketMQTemplate is another RocketMQTemplate extension for a particular business process, bound to TopicTransactionListener (one-to-one correspondence)
pom.xml
Org. Apache. Rocketmq: rocketmq – spring – the boot – starter: 2.1.1, referenced springboot version is at 2.0.5. RELEASE
<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion > 4.0.0 < / modelVersion > < groupId > com. Mrathena.. Middle ware < / groupId > The < artifactId > rocket. Mq. Springboot < / artifactId > < version > 1.0.0 < / version > < dependencyManagement > < dependencies > <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.4.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencies> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> Slf4j </groupId> <artifactId> slf4J-api </artifactId> </artifactId> < version > 1.7.30 < / version > < / dependency > < the dependency > < groupId > ch. Qos. Logback < / groupId > < artifactId > logback - classic < / artifactId > < version > 1.2.3 < / version > < / dependency > < the dependency > < the groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - spring - the boot - starter < / artifactId > < version > 2.1.1 < / version > <! -- Disable older versions of SpringBoot, RELEASE --> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </exclusion> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> < the groupId > IO. Springfox < / groupId > < artifactId > springfox swagger - UI < / artifactId > < version > 2.9.2 < / version > < / dependency > < the dependency > < groupId > IO. Springfox < / groupId > < artifactId > springfox - swagger2 < / artifactId > < version > 2.9.2 < / version > </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> < artifactId > maven -- the compiler plugin < / artifactId > < version > 3.8.1 < / version > < configuration > < source > 1.8 < / source > <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>Copy the code
application.yml
Server: servlet: context-path: port: 80 RocketMQ: name-server: 116.62.162.48:9876 Producer: group: producerCopy the code
Customer
package com.mrathena.rocket.mq.entity;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Customer {
private String username;
private String nickname;
}
Copy the code
Producers TestController
package com.mrathena.rocket.mq.controller; import com.mrathena.rocket.mq.configuration.TopicTransactionRocketMQTemplate; import com.mrathena.rocket.mq.entity.Customer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @Slf4j @RestController @RequestMapping("test") public class TestController { private static final String TOPIC = "topic"; @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private TopicTransactionRocketMQTemplate topicTransactionRocketMQTemplate; @getMapping ("base") public Object base() {destination: topic/topic:tag, topic or tag // payload: Load is the message body // message: Org. Springframework. Messaging. The Message is a Spring class encapsulation, and RocketMQ Message is not a class, There are no tags/keys and other contents rocketMQTemplate. Send (TOPIC, MessageBuilder withPayload (" hello "). SetHeader (" who are you ", "guess"). The build ()); // tags null rocketMQTemplate.convertAndSend(TOPIC, "tag null"); / / tags empty, proving that the tag has a value or null, there is no empty tag rocketMQTemplate. ConvertAndSend (TOPIC + ":", "tag empty?" ); / / only the tag no key rocketMQTemplate. ConvertAndSend (TOPIC + ": a", "tag a"); rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b"); Message(String topic, String tags, String keys, // rocketmq-spring-boot-starter adds userProperty and other attributes to headers. Specific can consult org. Apache. Rocketmq. Spring. Support. RocketMQUtil. AddUserProperties / / get a custom attribute, Map<String, Object> properties = new HashMap<>(); properties.put("property", 1); Properties. Put (" other-property", "hello "); rocketMQTemplate.convertAndSend(TOPIC, "property 1", properties); rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 1", properties); rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b property 1", properties); properties.put("property", 5); rocketMQTemplate.convertAndSend(TOPIC, "property 5", properties); rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 5", properties); rocketMQTemplate.convertAndSend(TOPIC + ":c", "tag c property 5", properties); / rear/message processor, can be in before sending the message body and headers to do a wave of operating rocketMQTemplate. ConvertAndSend (TOPIC, "news post processor", new MessagePostProcessor() { /** * org.springframework.messaging.Message */ @Override public Message<? > postProcessMessage(Message<? > message) { Object payload = message.getPayload(); MessageHeaders messageHeaders = message.getHeaders(); return message; }}); / / convertAndSend bottom is syncSend / / syncSend the info (" {} ", rocketMQTemplate syncSend (TOPIC, "sync send")); // asyncSend rocketMQTemplate.asyncSend(TOPIC, "async send", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("onSuccess"); } @Override public void onException(Throwable e) { log.info("onException"); }}); // sendOneWay rocketMQTemplate.sendOneWay(TOPIC, "send one way"); // I'm not sure what it is. Error reporting when running!! / / Object receive = rocketMQTemplate. SendAndReceive (TOPIC, "hello", String. Class); // log.info("{}", receive); return "success"; } @GetMapping("transaction") public Object transaction() { Message<Customer> message = MessageBuilder.withPayload(new Customer("mrathena", "who are you ").build(); / / here is used by @ ExtRocketMQTemplateConfiguration (group = "anotherProducer") to expand out another RocketMQTemplate the info (" {} ", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC, message, null)); log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC + ":tag-a", message, null)); return "success"; }}Copy the code
Configuration TopicTransactionRocketMQTemplate
package com.mrathena.rocket.mq.configuration; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; / * * * a transaction process and a RocketMQTemplate needs corresponding * can @ ExtRocketMQTemplateConfiguration (note that the annotation is @ Component annotation) to expand more RocketMQTemplate * Different transaction process RocketMQTemplate producerGroup cannot be the same * because MQBroker will call back a certain checkLocalTransactionState method under the same producerGroup, If different processes use the same producerGroup, Method may call the wrong * / @ ExtRocketMQTemplateConfiguration (group = "anotherProducer") public class TopicTransactionRocketMQTemplate extends RocketMQTemplate {}Copy the code
Consumers TopicListener
package com.mrathena.rocket.mq.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * The simplest consumer example * Topic: topic * consumerGroup: selectorType: Filter methods, TAG: TAG filtering, only supports tags, SQL92:SQL filtering, supports tags and attributes * selectorExpression: Filter expressions, according to the selectorType, TAG, writing labels such as "a | | b", SQL92, write SQL expression * consumeMode: CONCURRENTLY: concurrent consumption, ORDERLY: sequential consumption * messageModel: */ @slf4j @component @rocketmqMessagelistener (topic = "topic", // selectorType = selectorType. TAG, // Must specify selectorExpression, // selectorType = selectorType.sql92, // Without any tag // selectorExpression = "*", // Without any tag, // As long as the tag is a message, the pression = "a", / / to the tag for the news of a or b / / selectorExpression = "| a | b", / / SelectorType SQL92, you can skip the tag, SelectorExpression = "value = 1", // selectorExpression = "TAGS are not null", // if the tag is empty, it can't be empty. // selectorExpression = "TAGS = ", // selectorType. SQL92, which is used to filter TAGS, // selectorExpression = "(TAGS is not null and TAGS = 'a') and (property is not null and property Between 4 and 6) ", / / concurrent consumption consumeMode = consumeMode. CONCURRENTLY, / / order/consumption/consumeMode = consumeMode ORDERLY, / / cluster consumption messageModel = messageModel CLUSTERING, / / consumption/radio/messageModel = messageModel -, consumerGroup = "consumer" ) public class TopicListener implements RocketMQListener<String> { public void onMessage(String s) { log.info("{}", s); }}Copy the code
Consumers TopicTransactionListener
package com.mrathena.rocket.mq.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQTransactionListener(rocketMQTemplateBeanName = "topicTransactionRocketMQTemplate") public class TopicTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { // message: Org. Springframework. Messaging. The Message is a Spring class encapsulation, and RocketMQ Message is not a class, there are no tags/keys such as content / / in general, Instead of handling tags/keys, you can do different things depending on the fields in the message body. The second parameter can also be used to pass some data to log.info("executeLocalTransaction Message :{}, object:{}", message, o); log.info("payload: {}", new String((byte[]) message.getPayload())); MessageHeaders headers = message.getHeaders(); log.info("tags: {}", headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS)); log.info("rocketmq_TOPIC: {}", headers.get("rocketmq_TOPIC")); log.info("rocketmq_QUEUE_ID: {}", headers.get("rocketmq_QUEUE_ID")); log.info("rocketmq_MESSAGE_ID: {}", headers.get("rocketmq_MESSAGE_ID")); log.info("rocketmq_TRANSACTION_ID: {}", headers.get("rocketmq_TRANSACTION_ID")); log.info("TRANSACTION_CHECK_TIMES: {}", headers.get("TRANSACTION_CHECK_TIMES")); log.info("id: {}", headers.get("id")); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("checkLocalTransaction message:{}", message); / / after calling the checkLocalTransaction, another routine message listener to receive news return RocketMQLocalTransactionState.COM MIT; }}Copy the code
The last
Welcome to pay attention to the public number: the future has light, receive a line of large factory Java interview questions summary + the knowledge points learning thinking guide + a 300 page PDF document Java core knowledge points summary!