This is the 29th day of my participation in the August Wenwen Challenge.More challenges in August

Integrate SpringBoot RocketMQ

Note the version of the Starter integration package of SpringBoot. Because SpringBoot’s integrated RocketMQ starter dependency is provided by the Spring community, iterations are fast and variations between versions are large. Different versions may lead to errors in use.

Maven dependencies, directly put my Maven project configuration here.

Ordinary message

Maven project creation

I simply created an empty Maven project and then imported what Spring Boot required.


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.3.9. RELEASE</version>
       <relativePath/> <! -- lookup parent from repository -->
   </parent>

   <groupId>org.example</groupId>
   <artifactId>RocketMQAPI</artifactId>
   <version>1.0 the SNAPSHOT</version>

   <properties>
       <maven.compiler.source>8</maven.compiler.source>
       <maven.compiler.target>8</maven.compiler.target>
   </properties>

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-tomcat</artifactId>
       </dependency>

       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-client</artifactId>
           <version>4.7.1</version>
       </dependency>

       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-spring-boot-starter</artifactId>
           <version>2.1.1</version>
           <exclusions>
               <exclusion>
                   <groupId>org.springframework.boot</groupId>
                   <artifactId>spring-boot-starter</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.springframework</groupId>
                   <artifactId>spring-core</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.springframework</groupId>
                   <artifactId>spring-webmvc</artifactId>
               </exclusion>
           </exclusions>
       </dependency>

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
           <version>2.1.6. RELEASE</version>
       </dependency>
       <dependency>
           <groupId>io.springfox</groupId>
           <artifactId>springfox-swagger-ui</artifactId>
           <version>2.9.2</version>
       </dependency>
       <dependency>
           <groupId>io.springfox</groupId>
           <artifactId>springfox-swagger2</artifactId>
           <version>2.9.2</version>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-autoconfigure</artifactId>
           <version>2.3.9. RELEASE</version>
       </dependency>

       <dependency>
           <groupId>cn.hutool</groupId>
           <artifactId>hutool-all</artifactId>
           <version>5.7.9</version>
       </dependency>
   </dependencies>

</project>
Copy the code

Create a SpringProducer

The instance

@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // Message sending method
    public void sendMessage(String topic, String msg){
        this.rocketMQTemplate.convertAndSend(topic, msg); }}Copy the code

Create consumer SpringConsumer

Examples are often simple

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Recived message: "+ message); }}Copy the code

ConsumerGroup is the RocketMQ group, the Spring YML configuration group, and the Topic used by the Topic producer

Yml configuration

rocketmq:
  producer:
    group: springBootGroup
  name-server: 192.16840.128.: 9876; 192.168.40.129:9876; 192.168.40.130:9876
server:
  port: 1100
Copy the code

The controller to create

The instance

@RestController @RequestMapping("/MQTest") public class MQTestController { @Resource private SpringProducer springProducer; @GetMapping("/sendMessage") public String sendMessage(@RequestParam("message") String message){ springProducer.sendMessage("TestTopic", message); Return "Message sent completed "; }}Copy the code

Creating a startup class

The instance

@SpringBootApplication(scanBasePackages = {"com.anzhi.rocketmq.*"})
public class RocketMQScApplication {
    public static void main(String[] args) { SpringApplication.run(RocketMQScApplication.class, args); }}Copy the code

After start-up access controller layer address: my local address: http://localhost:1100/MQTest/sendMessage? message=123121

Now the consumer can consume the message:

Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Copy the code

Transactional message

producers

// Transaction messages are sent
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException{
    String[] tags = new String[]{"TagA"."TagB"."TagC"."TagD"."TagE"};
    for(int i=0; i<10; i++){
        Message<String> message = MessageBuilder.withPayload(msg).build();
        String destination = topic + ":" + tags[i % tags.length];
        SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
        System.out.printf("%s%n", sendResult);

        Thread.sleep(10); }}Copy the code

Compared with sending normal messages, transaction messages need to implement an additional transaction message listener

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        Object id = message.getHeaders().get("id");
        String destination = o.toString();
        localTrans.put(id, destination);
        org.apache.rocketmq.common.message.Message msg = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, message);
        String tags = msg.getTags();
        if(StringUtils.contains(tags, "TagA")) {return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags, "TagB")) {return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            returnRocketMQLocalTransactionState.UNKNOWN; }}@Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // The SpringBoot message object does not have the transactionId property. It's not the same as native apis.
        //String destination = localTrans.get(msg.getTransactionId());
        return RocketMQLocalTransactionState.COMMIT;
    }
Copy the code

RocketMQTemplateBeanName specifies the name of the bean that Spring Boot integrates with RocketMQ’s rocketMQTemplate, annotating transaction message processing.

Note that one transaction listener corresponds to one transaction flow. If there are more than one, rocketMQTemplate cannot be specified. However, Spring Boot provides an extension that allows developers to inherit rocketMQTemplate and customize the properties to meet our needs. As follows:

(ToDo: Find an example to test, put it on hold)

@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {}Copy the code

News test results that the same access to their local link: http://localhost:1100/MQTest/sendTransMessage? message=1242543

endResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9D0D0100, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=2], queueOffset=103]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9D800103, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=3], queueOffset=104]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9DAE0106, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=0], queueOffset=105]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9DE80109, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=1], queueOffset=106]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9E4C010C, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=2], queueOffset=107]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9E82010F, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=3], queueOffset=108]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9EAD0112, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=0], queueOffset=109]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9ED50115, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=1], queueOffset=110]
Recived message: 1242543
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9F040118, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=2], queueOffset=111]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9F37011E, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=3], queueOffset=112]
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Copy the code

There are other message types, as well, in rocketMQ’s junit case, to look at. I’ll write it when I have time to do it myself.

Spring Cloud integrates MQ

Spring Boot has significantly reduced the cost of RocketMQ, but it still needs to be configured in some business scenarios. SpringCloudStream solves this problem.

It is a unified message-driven framework provided by the Spring community with the goal of using a unified programming model to interface with all MQ messaging intermediation products.

The same is three plate axe

Maven rely on


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringCloudMQ</artifactId>
    <version>1.0 the SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.2.3. RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.3. RELEASE</version>
        </dependency>
    </dependencies>

</project>
Copy the code

Start the class

@EnableBinding({Source.class, Sink.class}) @SpringBootApplication public class ScRocketMQApplication { public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class, args); }}Copy the code

Note the @enableBinding ({source.class, sink.class}) annotation, which is the Binder configuration introduced by SpringCloudStream.

Yml configuration

server:
  port: 11000
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: TestTopic
          group: scGroup
        output:
          destination: TestTopic
      rocketmq:
        binder:
          name-server: 192.168232.128.: 9876; 192.168.232.129:9876; 192.168.232.130:9876
Copy the code

Create consumers

@Component
public class ScConsumer {
    @StreamListener(Sink.INPUT)
    public void onMessage(String message){
        System.out.println("received message:"+message+" from binding:"+ Sink.INPUT); }}Copy the code

Creating a producer

@Component
public class ScProducer {
    @Resource
    private Source source;

    public void sendMessage(String msg){
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> message = MessageBuilder.createMessage(msg,
                messageHeaders);
        this.source.output().send(message); }}Copy the code

The controller layer

Test results:

2021-08-28 23:51:49.570 INFO 25468 -- [IO-11100-EXEC-1] O.A.C.C.C. [Tomcat].[/] : Initializing Spring DispatcherServlet 'DispatcherServlet' 2021-08-28 23:51:49.571 INFO 25468 -- [IO-11100-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet' dispatcherServlet' 2021-08-28 23:51:49.585 INFO 25468 -- [IO-11100-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 14 ms received message:1242543 from binding:inputCopy the code

summary

The above is a simple example of Spring integrating RocketMQ, but not a complex application. In the follow-up study, ToDo: complete it.

SpringCloudStream is a generic middleware programming framework, as evidenced by the use above. None of the business code needs to be changed, just the POM dependencies and configuration files need to be modified to switch the middleware. However, different MQ has its own business model, and the gap is still wide. So consider the transformation of the business model, as well as the customization properties of different MQ. Rocketmq personal attributes, for example, in the spring. The cloud. Stream. Rocketmq beginning.

The second problem is the RocketMQ version in SpringCloudStream, as mentioned earlier, and the complete documentation of RocketMQ is missing, because RocketMQ is maintained by the manufacturer itself, so…

Therefore, SpringCloudStream is currently not a very good integration solution for RocketMQ. It’s a far cry from kafka and Rabbit. So use it with caution