1. Reference jar packages
The build.gradle file adds jar package references
Compile group: 'org.apache. rocketMQ ', name: 'Rocketmq-spring-boot-starter ', version: '2.1.1'Copy the code
2. Configuration files
Application.properties configuration file
Spring. The application. The name = app - demp server port = 8081 # # # # # rocketmq rocketmq#. Name - server = 192.168.1.107:9876 rocketmq.producer.timeout=10000Copy the code
3. Generator
Mqsender.java – Message generation interface
import org.apache.rocketmq.client.producer.SendResult; Public interface MQSender{/** * send message ** @param message message * @param topic * @return SendResult */ SendResult sendMessage(Object message, String topic); /** * sendMessage ** @param message message * @param topic topic * @param tags topic tag * @return SendResult */ SendResult sendMessage(Object) message, String topic, String tags); }Copy the code
Rocketmqsender.java – RockemtMQ implementation
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class RocketMQSender implements MQSender { @Value("${rocketmq.name-server}") private String nameServer; @Value("${rocketmq.producer.timeout}") private int timeout; @Value("${spring.application.name}") private String group; private DefaultMQProducer producer; @PostConstruct public void init() { producer = new DefaultMQProducer(group); try { producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(timeout); producer.start(); Log.info ("RocketMQ Producer started successfully! nameServer={}, group={}", nameServer, group); } catch (MQClientException e) {log.error("RocketMQ Producer failed to start! nameServer={}, group={} ", nameServer, group, e); } } @Override public SendResult sendMessage(Object message, String topic) { try { Message msg = new Message(topic, JSON.toJSONBytes(message)); SendResult sendResult = producer.send(msg); Log.info (" MQ sent successfully: sendResult={},message={}", sendResult, message.tostring ()); return sendResult; } catch (Exception e) {log.error(" failed to send a message, topic:{}, message:{}", topic, message, e); } return null; } @Override public SendResult sendMessage(Object message, String topic, String tags) { try { Message msg = new Message(topic, tags, JSON.toJSONBytes(message)); SendResult sendResult = producer.send(msg); Log.info (" MQ sent successfully: sendResult={},message={}", sendResult, message.tostring ()); return sendResult; } catch (Exception e) {log.error(" failed to send a message, topic:{}, tags:{}, message:{}", topic, tags, message, e); } return null; }}Copy the code
Orderproducer.java – instance of the sender
import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class OrderProducer { @Resource private MQSender mqSender; Public void createOrder() {mqsender.sendMessage (" I register order, please process as soon as possible ", "TEMP"); }}Copy the code
4. Consumers
OrderConsumer.java
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP") public class OrderConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { String message = new String(messageExt.getBody()); Log.info (" Received message, topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId(), message); }}Copy the code
Result of sender execution
Consumer performance results