This is the 29th day of my participation in the August Wenwen Challenge.More challenges in August
Integrate SpringBoot RocketMQ
Note the version of the Starter integration package of SpringBoot. Because SpringBoot’s integrated RocketMQ starter dependency is provided by the Spring community, iterations are fast and variations between versions are large. Different versions may lead to errors in use.
Maven dependencies, directly put my Maven project configuration here.
Ordinary message
Maven project creation
I simply created an empty Maven project and then imported what Spring Boot required.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>RocketMQAPI</artifactId>
<version>1.0 the SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6. RELEASE</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>2.3.9. RELEASE</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.9</version>
</dependency>
</dependencies>
</project>
Copy the code
Create a SpringProducer
The instance
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// Message sending method
public void sendMessage(String topic, String msg){
this.rocketMQTemplate.convertAndSend(topic, msg); }}Copy the code
Create consumer SpringConsumer
Examples are often simple
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Recived message: "+ message); }}Copy the code
ConsumerGroup is the RocketMQ group, the Spring YML configuration group, and the Topic used by the Topic producer
Yml configuration
rocketmq:
producer:
group: springBootGroup
name-server: 192.16840.128.: 9876; 192.168.40.129:9876; 192.168.40.130:9876
server:
port: 1100
Copy the code
The controller to create
The instance
@RestController @RequestMapping("/MQTest") public class MQTestController { @Resource private SpringProducer springProducer; @GetMapping("/sendMessage") public String sendMessage(@RequestParam("message") String message){ springProducer.sendMessage("TestTopic", message); Return "Message sent completed "; }}Copy the code
Creating a startup class
The instance
@SpringBootApplication(scanBasePackages = {"com.anzhi.rocketmq.*"})
public class RocketMQScApplication {
public static void main(String[] args) { SpringApplication.run(RocketMQScApplication.class, args); }}Copy the code
After start-up access controller layer address: my local address: http://localhost:1100/MQTest/sendMessage? message=123121
Now the consumer can consume the message:
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Recived message: 123121
Copy the code
Transactional message
producers
// Transaction messages are sent
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException{
String[] tags = new String[]{"TagA"."TagB"."TagC"."TagD"."TagE"};
for(int i=0; i<10; i++){
Message<String> message = MessageBuilder.withPayload(msg).build();
String destination = topic + ":" + tags[i % tags.length];
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10); }}Copy the code
Compared with sending normal messages, transaction messages need to implement an additional transaction message listener
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
Object id = message.getHeaders().get("id");
String destination = o.toString();
localTrans.put(id, destination);
org.apache.rocketmq.common.message.Message msg = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, message);
String tags = msg.getTags();
if(StringUtils.contains(tags, "TagA")) {return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags, "TagB")) {return RocketMQLocalTransactionState.ROLLBACK;
}else{
returnRocketMQLocalTransactionState.UNKNOWN; }}@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// The SpringBoot message object does not have the transactionId property. It's not the same as native apis.
//String destination = localTrans.get(msg.getTransactionId());
return RocketMQLocalTransactionState.COMMIT;
}
Copy the code
RocketMQTemplateBeanName specifies the name of the bean that Spring Boot integrates with RocketMQ’s rocketMQTemplate, annotating transaction message processing.
Note that one transaction listener corresponds to one transaction flow. If there are more than one, rocketMQTemplate cannot be specified. However, Spring Boot provides an extension that allows developers to inherit rocketMQTemplate and customize the properties to meet our needs. As follows:
(ToDo: Find an example to test, put it on hold)
@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {}Copy the code
News test results that the same access to their local link: http://localhost:1100/MQTest/sendTransMessage? message=1242543
endResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9D0D0100, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=2], queueOffset=103]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9D800103, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=3], queueOffset=104]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9DAE0106, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=0], queueOffset=105]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9DE80109, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=1], queueOffset=106]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9E4C010C, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=2], queueOffset=107]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9E82010F, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=3], queueOffset=108]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9EAD0112, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=0], queueOffset=109]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9ED50115, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=1], queueOffset=110]
Recived message: 1242543
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9F040118, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=2], queueOffset=111]
SendResult [sendStatus=SEND_OK, msgId=24098A0031A928409DD7720C18F134F221B818B4AAC28D3E9F37011E, offsetMsgId=null, messageQueue=MessageQueue [topic=TestTopic, brokerName=RaftNode00, queueId=3], queueOffset=112]
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Recived message: 1242543
Copy the code
There are other message types, as well, in rocketMQ’s junit case, to look at. I’ll write it when I have time to do it myself.
Spring Cloud integrates MQ
Spring Boot has significantly reduced the cost of RocketMQ, but it still needs to be configured in some business scenarios. SpringCloudStream solves this problem.
It is a unified message-driven framework provided by the Spring community with the goal of using a unified programming model to interface with all MQ messaging intermediation products.
The same is three plate axe
Maven rely on
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SpringCloudMQ</artifactId>
<version>1.0 the SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.2.3. RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.3. RELEASE</version>
</dependency>
</dependencies>
</project>
Copy the code
Start the class
@EnableBinding({Source.class, Sink.class}) @SpringBootApplication public class ScRocketMQApplication { public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class, args); }}Copy the code
Note the @enableBinding ({source.class, sink.class}) annotation, which is the Binder configuration introduced by SpringCloudStream.
Yml configuration
server:
port: 11000
spring:
cloud:
stream:
bindings:
input:
destination: TestTopic
group: scGroup
output:
destination: TestTopic
rocketmq:
binder:
name-server: 192.168232.128.: 9876; 192.168.232.129:9876; 192.168.232.130:9876
Copy the code
Create consumers
@Component
public class ScConsumer {
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("received message:"+message+" from binding:"+ Sink.INPUT); }}Copy the code
Creating a producer
@Component
public class ScProducer {
@Resource
private Source source;
public void sendMessage(String msg){
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "testTag");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<String> message = MessageBuilder.createMessage(msg,
messageHeaders);
this.source.output().send(message); }}Copy the code
The controller layer
Test results:
2021-08-28 23:51:49.570 INFO 25468 -- [IO-11100-EXEC-1] O.A.C.C.C. [Tomcat].[/] : Initializing Spring DispatcherServlet 'DispatcherServlet' 2021-08-28 23:51:49.571 INFO 25468 -- [IO-11100-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet' dispatcherServlet' 2021-08-28 23:51:49.585 INFO 25468 -- [IO-11100-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 14 ms received message:1242543 from binding:inputCopy the code
summary
The above is a simple example of Spring integrating RocketMQ, but not a complex application. In the follow-up study, ToDo: complete it.
SpringCloudStream is a generic middleware programming framework, as evidenced by the use above. None of the business code needs to be changed, just the POM dependencies and configuration files need to be modified to switch the middleware. However, different MQ has its own business model, and the gap is still wide. So consider the transformation of the business model, as well as the customization properties of different MQ. Rocketmq personal attributes, for example, in the spring. The cloud. Stream. Rocketmq beginning.
The second problem is the RocketMQ version in SpringCloudStream, as mentioned earlier, and the complete documentation of RocketMQ is missing, because RocketMQ is maintained by the manufacturer itself, so…
Therefore, SpringCloudStream is currently not a very good integration solution for RocketMQ. It’s a far cry from kafka and Rabbit. So use it with caution