The introduction

RocketMQTemplate is a spring Boot template class for RocketMQ. It provides a number of overloaded methods for sending various messages.

Project depend on

pom.xml

<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion > 4.0.0 < / modelVersion > < the parent > < groupId > org. Springframework. Boot < / groupId > The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.2.5. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <groupId>com.neo</groupId> <artifactId>user-center</artifactId> < version > 0.0.1 - the SNAPSHOT < / version > < name > user - center < / name > < description > Demo projectforSpring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> < the groupId > tk. Mybatis < / groupId > < artifactId > mapper - spring - the boot - starter < / artifactId > < version > 2.1.5 < / version > < / dependency >  <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <! - integration of spring cloud - > < the dependency > < groupId > org. Springframework. Cloud < / groupId > <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR3</version> <type>pom</type> <scope>import</scope> </dependency> <! <dependency> <groupId>com.alibaba. Cloud </groupId> The < artifactId > spring - the cloud - alibaba - dependencies < / artifactId > < version > 2.2.1. RELEASE < / version > <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> The < version > 1.3.6 < / version > < configuration > < configurationFile >${basedir}/src/main/resources/generator/generatorConfig.xml
                    </configurationFile>
                    <overwrite>true</overwrite>
                    <verbose>true</verbose> </configuration> <dependencies> <dependency> <groupId>mysql</groupId> < artifactId > mysql connector - Java < / artifactId > < version > 8.0.20 < / version > < / dependency > < the dependency > <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>4.1.5</version> </dependency> </dependencies> </plugin> </plugins> </build> </project>Copy the code

Message producer

package com.carnation.rocketmq; import com.alibaba.fastjson.JSON; import com.carnation.domain.dto.SysUserDto; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import java.util.List; import java.util.UUID; /** * @author zy * @date 2020/6/10 * @since 1.0.0 */ @slf4j @service public class RocketMQTemplateProducer {@autoWired private RocketMQTemplate template; ** @param topic * @param message body */ public void sendMessage(String topic, Object message) { this.template.convertAndSend(topic, message); log.info("Normal message sent complete: message = {}", message); } /** * @param topic * @param message body */ public void syncSendMessage(String topic, Object message) { SendResult sendResult = this.template.syncSend(topic, message); log.info("Message = {}, sendResult = {}", message, sendResult); @param topic @param message body public void asyncSendMessage(String topic, Object message) { this.template.asyncSend(topic, message, newSendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Asynchronous message sent successfully, message = {}, SendStatus = {}", message, sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable e) {
                log.info("Asynchronous sending exception, exception = {}", e.getMessage()); }}); } /** * @param topic * @param message body */ public void sendOneWayMessage(String topic, Object message) { this.template.sendOneWay(topic, message); log.info("One-way message completion: message = {}", message); } /** * @param topic * @param messageList message set * @param timeout Timeout time */ public void syncSendMessages(String topic, List<Message<? >> messageList, long timeout) { this.template.syncSend(topic, messageList, timeout); log.info("Synchronous batch message sending complete: message = {}", JSON.toJSONString(messageList)); } /** * @param topic * @param message object */ public void sendMessageInTransaction(String topic, SysUserDto message) { String transactionId = UUID.randomUUID().toString(); TransactionSendResult result = this.template.sendMessageInTransaction(topic, MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .build(), message); log.info("Sending transaction message (half message) completed: result = {}", result); } /** ** sends tag messages (filter messages) ** @param Topic, RocketMQTemplate combines topic and tag into one, and the bottom layer is broken down and assembled. Tag * can be specified as long as {:tags} is followed when specifying topictest-topic:tagA * @param message body */ public void syncSendMessageWithTag(String topic, Object message) { this.template.syncSend(topic, message); log.info("Sending tag message complete: message = {}", message); } @param topic topic * @param message body * @param timeout timeout * @param delayLevel delayLevel: Currently, RocketMq does not support arbitrary latency, requiring several fixed latency levels, * from 1s to 2h, corresponding to levels 1 to 18, where message consumption failures are queued for delayed messages *"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     */
    public void syncSendDelay(String topic, Object message, long timeout, int delayLevel) {
        this.template.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, delayLevel);
        log.info("Synchronous sent delayed message message = {}", message); } /** * asynchronously send delayed messages ** @param topic topic * @param message message object * @param timeout timeout time * @param delayLevel delayLevel */ public void asyncSendDelay(String topic, Object message, long timeout, int delayLevel) { this.template.asyncSend(topic, MessageBuilder.withPayload(message).build(), newSendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Asynchronous sending delayed message succeeded, message = {}", message);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("Exception = {}", throwable.getMessage());
            }
        }, timeout, delayLevel);
        log.info("Delayed message sent asynchronously message = {}", message); } /** * @param Topic */ public void sendOneWayOrderly(String topic) {for (int i = 0; i < 30; i++) {
            this.template.sendOneWayOrderly(topic, MessageBuilder.withPayload("message - " + i).build(), "topic");
            log.info("One-way sequential message sending complete: message = {}"."message - "+ i); }} /** * @param topic */ public void syncSendOrderly(String topic) {for (int i = 0; i < 30; i++) {
            SendResult sendResult = this.template.syncSendOrderly(topic, MessageBuilder.withPayload("message - " + i).build(), "syncOrderlyKey");
            log.info("Synchronous sequential message sending complete: message = {}, sendResult = {}"."message - "+ i, sendResult); }}}Copy the code
  • Send batch message, there is a BUG in the earlier version, even if the bulk message body is constructed, also can not call the method of send batch message, dependentrocketmq-spring-boot-starterUpgrade above 2.0.4 to normal call.
  • The method for sending transaction messages is inrocketmq-spring-boot-starter2.0.4 began to change, fewer parameters, is said to be in order to solve a multithreaded scenarios of security problems, the new version no matter how many transaction message sent in the engineering, can only have a local transaction listener class, listening to the local transaction execution, then decide whether the message sent and can be consumed, the old version each transaction message corresponding to a local transaction listener class

Local transaction listener for service transaction messages

package com.carnation.rocketmq; import com.carnation.dao.rocketmq.RocketmqTransactionLogMapper; import com.carnation.domain.dto.SysUserDto; import com.carnation.domain.entity.RocketmqTransactionLog; import com.carnation.domain.entity.SysUser; import com.carnation.service.SysUserService; import com.carnation.util.BeanUtils; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; / * * * @ author zy * @ version 1.0.0 * @ date 2020/6/17 * / @ Slf4j @ RocketMQTransactionListener public class TestMessageTransactionListener implements RocketMQLocalTransactionListener { private final SysUserService sysUserService; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; public TestMessageTransactionListener(SysUserService sysUserService, RocketmqTransactionLogMapper rocketmqTransactionLogMapper) { this.sysUserService = sysUserService; this.rocketmqTransactionLogMapper = rocketmqTransactionLogMapper; } @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { MessageHeaders headers = message.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); SysUserDto dto = (SysUserDto) o; SysUser user = new SysUser(); BeanUtils.copyProperties(dto, user); Try {/ / perform local affairs this. SysUserService. InsertWithRocketLog (user, transactionId); // Return the local transaction execution status to commit, send transaction message log.info("Local transaction ok, message can be sent..");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build());
        if(transactionLog ! = null) {return RocketMQLocalTransactionState.COMMIT;
        }
        returnRocketMQLocalTransactionState.ROLLBACK; }}Copy the code

Test the endpoint

package com.carnation.controller.rocketmq; import com.alibaba.fastjson.JSON; import com.carnation.domain.dto.SysUserDto; import com.carnation.rocketmq.RocketMQTemplateProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; /** * @author zy * @date 2020/6/9 */ @slf4j @restController @requestMapping ("/producer/template") public class RocketTemplateController { @Autowired private RocketMQTemplateProducer producer; /** * @getMapping ("/ordinary")
    public String sendMessage() {
        SysUserDto sysUserDto = SysUserDto.builder()
                .accountName("chen")
                .userName("hua")
                .password("123456")
                .build();
        this.producer.sendMessage("ordinary-message", sysUserDto);
        return "Success: message sent: message ="+ JSON.toJSONString(sysUserDto); } /** * synchronously sends messages ** @returnFeedback */ @getMapping ("/sync")
    public String syncSendMessage() {
        SysUserDto sysUserDto = SysUserDto.builder()
                .accountName("sync")
                .userName("message")
                .password("12ds456")
                .build();
        this.producer.syncSendMessage("sync-message", sysUserDto);
        return "Success: message sent synchronously: message ="+ JSON.toJSONString(sysUserDto); } /** * asynchronously sends messages ** @returnFeedback */ @getMapping ("/async")
    public String asyncSendMessage() {
        SysUserDto sysUserDto = SysUserDto.builder()
                .accountName("async")
                .userName("message")
                .password("8752ert")
                .build();
        this.producer.asyncSendMessage("async-message", sysUserDto);
        return "Success: message sent asynchronously: message ="+ JSON.toJSONString(sysUserDto); } /** * send batch messages ** @returnFeedback */ @getMapping ("/syncMessages")
    public String asyncSendMessages() { List<Message<? >> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {
            messageList.add(MessageBuilder.withPayload(
                    SysUserDto.builder()
                            .id(String.valueOf(i))
                            .accountName("accountName-" + i)
                            .userName("userName-" + i)
                            .password("password-" + i)
                            .email("email-" + i)
                            .build()
            ).build());
        }
        this.producer.syncSendMessages("sync-messages", messageList, 5000);
        return "Success: Messages have been sent asynchronously in batches"; } /** * sends a transaction message ** @returnFeedback */ @getMapping ("/transactionMessage")
    public String sendMessageInTransactionMessage() {
        SysUserDto sysUserDto = SysUserDto.builder()
                .id(String.valueOf(Math.random()))
                .accountName("transaction")
                .userName("message")
                .password("transaction")
                .email("[email protected]")
                .build();
        this.producer.sendMessageInTransaction("transaction-message", sysUserDto);
        return "Success: transaction message sent: message ="+ JSON.toJSONString(sysUserDto); } /** * sends messages in one direction ** @returnFeedback */ @getMapping ("/oneWay")
    public String oneWaySendMessage() {
        SysUserDto sysUserDto = SysUserDto.builder()
                .accountName("oneWay")
                .userName("message")
                .password("asdew123")
                .build();
        this.producer.sendOneWayMessage("oneWay-message", sysUserDto);
        return "Success: message sent in one direction: message ="+ JSON.toJSONString(sysUserDto); } /** * sends sequential messages ** @ in one directionreturnFeedback */ @getMapping ("/oneWayOrderly")
    public String sendOneWayOrderlyMessage() {
        this.producer.sendOneWayOrderly("oneWay-order-message");
        return "Success: Ordered message sent in one direction.. "; } /** * synchronously sends sequential messages ** @returnFeedback */ @getMapping ("/syncOrderly")
    public String syncSendOrderlyMessage() {
        this.producer.syncSendOrderly("sync-order-message");
        return "Success: Ordered messages have been sent synchronously.. "; } /** * Synchronously sends delayed messages ** @returnFeedback */ @getMapping ("/syncDelay")
    public String syncSendDelayMessage() {
        SysUserDto userDto = SysUserDto.builder()
                .userName("sync")
                .accountName("delay")
                .email("[email protected]")
                .mobile("17898097654")
                .build();
        this.producer.syncSendDelay("sync-delay-message", userDto, 10000, 4);
        return "Success: Delay message has been sent synchronously.. "; } /** * Sends messages with tags for message filtering ** @returnFeedback */ @getMapping ("/withTag")
    public String syncSendWithTagMessage() {
        SysUserDto userDto = SysUserDto.builder()
                .userName("tag")
                .accountName("message")
                .email("[email protected]")
                .mobile("17898097654")
                .build();
        this.producer.syncSendMessageWithTag("tag-message:testTag", userDto);
        return "Success: Message with tag has been sent synchronously.. "; }}Copy the code

Message consumer

  • Consume messages sent synchronously
/** * listen to consume synchronous sent messages ** @author zy * @date 2020/6/12 * @since 1.0.0 */ @slf4j@component @RocketMQMessageListener(consumerGroup ="sync-consumer", topic = "sync-message")
public class SyncMessageListener implements RocketMQListener<SysUserDto> {
    @Override
    public void onMessage(SysUserDto sysUserDto) {
        log.info("Synchronous message consumed: message = {}", JSON.toJSONString(sysUserDto)); }}Copy the code
  • Consume synchronous batch sent messages
/** * listen to consume synchronized batch sent messages. * * TIPS: ConsumeMode = consumeMode.ORDERLY * Ensures the order in which messages are consumed. The same order that was added to the collection when the message collection was produced * at the cost of creating only one consumer thread to consume the message, Inefficient * * @author zy * @date 2020/6/16 * @since 1.0.0 */ @slf4j @Component @RocketmqMessagelistener (consumerGroup ="syncs-consumer", topic = "sync-messages") public class SyncMessagesListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto  sysUserDto) { log.info("Synchronous batch messages consumed: message = {}", JSON.toJSONString(sysUserDto)); }}Copy the code
  • Consume messages sent asynchronously
/** * listen to consume asynchronously sent messages ** @author zy * @date 2020/6/10 * @since 1.0.0 */ @slf4j@service @RocketMQMessageListener(consumerGroup ="async-consumer", topic = "async-message") public class AsyncMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto  sysUserDto) { log.info("Asynchronous message consumed: message = {}", JSON.toJSONString(sysUserDto)); }}Copy the code
  • Consuming a one-way message
/** * listen for messages sent in one direction by the consumer ** @author zy * @date 2020/6/10 * @since 1.0.0 */ @slf4j@component @RocketMQMessageListener(consumerGroup ="oneWay-consumer", topic = "oneWay-message")
public class OneWayMessageListener implements RocketMQListener<SysUserDto> {
    @Override
    public void onMessage(SysUserDto sysUserDto) {
        log.info("One-way message consumed: message = {}", JSON.toJSONString(sysUserDto)); }}Copy the code
  • Consume one-way ordered messages
/** ** @RocketmqMessagelistener must specify attribute * consumeMode = consumemode. ORDERLY consuming messages * * @author zy * @date 2020/6/10 * @since 1.0.0 */ @service.slf4j.rocketmqMessagelistener (consumerGroup ="oneWay-order-consumer",
        topic = "oneWay-order-message". consumeMode = ConsumeMode.ORDERLY ) public class OneWayOrderlyMessageListener implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("Unidirectional ordered message consumed: message = {}", s); }}Copy the code
  • Consume delayed messages sent synchronously
/** * listen to consume synchronous sent delayed messages ** @author zy * @date 2020/6/12 * @since 1.0.0 */ @slf4j@component @RocketMQMessageListener(consumerGroup ="sync-delay-consumer", topic = "sync-delay-message")
public class SyncDelayMessageListener implements RocketMQListener<SysUserDto> {
    @Override
    public void onMessage(SysUserDto sysUserDto) {
        log.info("Synchronous delay message consumed: message = {}", JSON.toJSONString(sysUserDto)); }}Copy the code
  • Consume synchronous ordered messages
/** * Listen to consume synchronous ordered messages * TIPS: @RocketmqMessagelistener must be configured with attribute * consumeMode = consumemode.orderly to consume messages * * @author zy * @date 2020/6/12 * @since 1.0.0 */ @service.slf4j.rocketmqMessagelistener (consumerGroup ="sync-orderly-consumer",
        topic = "sync-order-message". consumeMode = ConsumeMode.ORDERLY ) public class SyncOrderlyMessageListener implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("Synchronous ordered messages consumed: message = {}", s); }}Copy the code
  • Consumes messages under a topic with a specified tag
@rocketmqMessagelistener Default selectorExpression ="*", which means * consumes all messages under the given topic, modifies the attribute selectorExpression ="tagName"* You can filter messages by having the listener consume messages for a topic with a tag. * * @author zy * @date 2020/6/15 * @since 1.0.0 */ @slf4j @Component @RocketmqMessagelistener (consumerGroup ="tag-consumer", topic = "tag-message",
        selectorExpression = "testTag". selectorType = SelectorType.TAG ) public class TagMessageListener implements RocketMQListener<SysUserDto> { @Override public void onMessage(SysUserDto sysUserDto) { log.info("Consume tag message: tag = {}, message = {}"."testTag", JSON.toJSONString(sysUserDto)); }}Copy the code
  • Consumer transaction messages
/** * listen for consumer transaction messages ** @author zhaoWenCai * @date 2020/6/17 * @since 1.0.0 */ @slf4j@component @RocketMQMessageListener(consumerGroup ="transaction-consumer", topic = "transaction-message")
public class TransactionMessageListener implements RocketMQListener<SysUserDto> {
    @Override
    public void onMessage(SysUserDto sysUserDto) {
        log.info("Transaction message consumed: message = {}", JSON.toJSONString(sysUserDto)); }}Copy the code

For more information about RocketMQ, see github.com/apache/rock…