preface
This article focuses on using RocketMQ to send messages.
Mq client core class -DefaultMQProducer
Core method
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
Copy the code
Interface for sending messages
Default implementation class
Class diagram
Encapsulate the core classes of the MQ client
Encapsulate it yourself, write a message sending utility class, or put it in a small JAR.
The source code to achieve
package xxx.rocketmq; import xxx.util.LogUtil; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * public class Producer {private String namesrvAddr; //ip private String producerGroup; //group private DefaultMQProducer producer; Public String getNamesrvAddr() {return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getProducerGroup() { return producerGroup; } public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } // Initialize public void init() {logutil.infolog.info (" Rocketmq-producer initialization parameters start "); LogUtil.INFOLOG.info("producerGroup: " + producerGroup); producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); Producer.setinstancename (long.toString (system.currentTimemillis ()))); / / the Message Body size exceeds a threshold, the compression, the default is 1024 * 4 producer. SetCompressMsgBodyOverHowmuch (Integer. MAX_VALUE); Try {// Start the producer.start(); } catch (Exception e) {logutil.infolog. error(" Rocketmq-producer initialization parameters are abnormal: ", e); } } public void destroy() { producer.shutdown(); } /** * @param message * @return */ public SendResult send(message message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {// Call the official default implementation to send messages SendResult result = producer.send(message); return result; }}Copy the code
Define the bean
<! <bean id="mqProducer_xxx" class="xxx.rocketmq.Producer" init-method="init" destroy-method="destroy"> <property name="namesrvAddr" value="${namesrvAddr}"/> <property name="producerGroup" value="${rocketmq.xxx.group}"/> </bean>Copy the code
The application layer
Application layer utility classes
The application project is simply encapsulated, and the topic function is added.
Define the bean
<bean id="sendMqProcess_xxx" class="xxx.trade.process.SendMqProcess">
<property name="producer" ref="mqProducer_xxx"/>
<property name="topic" value="${rocketmq.xxx.topic}"/>
<property name="subExpression" value="${rocketmq.xxx.subExpression}"/>
</bean>
Copy the code
The source code to achieve
package xxx.trade.process; import xxx.core.exception.BizException; import xxx.rocketmq.Producer; import xxx.trade.constant.xxxReturnCode; import xxx.trade.util.LogUtil; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.SendStatus; import com.alibaba.rocketmq.common.message.Message; public class SendMqProcess { private Producer producer; private String topic; private String subExpression; /** * Send mq message ** @param jsonMsgContent * message content, The value must be in JSON format * @return * @throws BizException * @see */ public SendStatus sendMqMessage(String jsonMsgContent) throws BizException { try { Message msg = new Message(topic, subExpression, (jsonMsgContent).getBytes("UTF-8")); SendResult sendResult = producer.send(msg); Logutil.infolog.info (" message content :" + jsonMsgContent + "\n Returns result :" + sendresult.toString ()); return sendResult.getSendStatus(); } catch (Exception e) { throw new BizException(xxxReturnCode.DEAL_EXCEPTION.getCode(), Xxxreturncode.deal_exception. GetDesc ("Producer send message error: "+ jsonMsgContent) + e); } } public void setProducer(Producer producer) { this.producer = producer; } public void setTopic(String topic) { this.topic = topic; } public void setSubExpression(String subExpression) { this.subExpression = subExpression; }}Copy the code
The application layer provides static methods for sending messages
Provides static methods of the class for easy use.
public class xxxMerchantNotifyManager { private static final SendMqProcess SENDMQPROCESS_xxx = (SendMqProcess) SpringContext .getService("sendMqProcess_xxx"); // Asynchronous thread sends message to MQ public static void sendMqMessage(final BackMethodParm BackMethodParm, final Orderbill Orderbill, final boolean ignore) { Runnable childThread = new Runnable() { @Override public void run() { boolean isSentOk = false; String xxx = orderbill.getxxx(); Json = convertJSONString(backMethodParm, ignore); Try {// Send message queue logutil.infolog.info ("sendMqMessage message content :" + json); SendStatus state = SENDMQPROCESS_QRCODE .sendMqMessage(json); Logutil.infolog.info ("sendMqMessage returns content :" + state.tostring ()); if (state == SendStatus.SEND_OK || state == SendStatus.FLUSH_DISK_TIMEOUT || state == SendStatus.FLUSH_SLAVE_TIMEOUT || state == SendStatus.SLAVE_NOT_AVAILABLE) { isSentOk = true; }} the catch (BizException e) {log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "abnormal asynchronous notifications", e); } the catch (Exception e) {log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "abnormal asynchronous notifications", e); {}} the catch (Exception ex) log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "asynchronous notification sent the extraordinary turn json object", the ex); } // MQ send failed to enter fault tolerance if (! isSentOk) { try { NotifyFault entity = content2NotifyFault(orderbill, backMethodParm); The info (LogConst THROWABLEEXCEPTIONS + + "asynchronous notifications:" XXX + entity); ServiceDeclare.notifyFaultService .insertNotifyFault(entity); } the catch (Exception e) {log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "asynchronous notification message turned fault-tolerant processing abnormity", e); }}}}; try { threadPool.execute(childThread); {} the catch (Throwable t) log. The error (LogConst. THROWABLEEXCEPTIONS + exception handling mq message: "", t); }}Copy the code
Call a static method to send a message
xxxMerchantNotifyManager.sendMqMessage();
Copy the code
conclusion
The idea is to send messages based on the official default implementation of the MQ client, and then encapsulate them yourself, or even multiple layers, each layer of encapsulation, just a little bit more for special cases. For test use, you can simply call the official default implementation to send a message. But in a production environment, there’s usually one or more layers.