Message consumption pattern
RocketMQ has two message consumption modes, cluster mode and broadcast mode.
The consumption mode is determined by consumers. The default mode is cluster mode. The consumption mode can be changed by setting:
// Set to broadcast consumption mode
consumer.setMessageModel(MessageModel.BROADCASTING);
// Set to cluster consumption mode
consumer.setMessageModel(MessageModel.CLUSTERING);
Copy the code
Ensure that the consumption patterns of the same consumer group are the same; otherwise, problems such as message loss may occur.
Cluster pattern
In cluster mode, where consumers are clustered, RocketMQ assumes that the same message can only be consumed by any consumer in the same consumer group in the cluster.
Features:
- Each message needs to be processed by each consumer group only once, and the broker will send the message to only one consumer in the same consumer group.
- When a message is rerouted, it cannot be guaranteed to be routed to the same machine.
- The consumption state is maintained by the broker.
Broadcasting mode
When using broadcast mode, the broker pushes each message to all consumers within the cluster, ensuring that the message is consumed at least once by each machine.
Features:
- The consumption schedule is maintained by consumer.
- Ensure that messages are consumed at least once by each machine.
- The news of consumer failure will not recur.
Message sending mode
The synchronous
Producer sends a message and waits until the broker returns the message, ensuring that the message is delivered.
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group001");
/ / set namesrvAddr
producer.setNamesrvAddr("localhost:9876");
// Start the producer
producer.start();
// Send a message
Message message = new Message("topic001"."sync message".getBytes());
SendResult sendResult = producer.send(message);
System.out.println("sendResult: " + sendResult);
/ / close
producer.shutdown();
System.out.println("Shut down the producer");
}
Copy the code
Asynchronous send
The producer sends messages without waiting and directly returns them. The results of the message delivery are returned asynchronously to registered listeners.
If you want to send messages quickly without losing them, you can use asynchronous messaging.
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group001");
/ / specified namesrv
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("topic001"."async message".getBytes());
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("ok");
System.out.println("sendResult: " + sendResult);
}
public void onException(Throwable throwable) {
System.out.println("error"); System.out.println(throwable.getMessage()); }});// Sleep 3 seconds
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
System.out.println("Shut down");
}
Copy the code
One way to send
When the producer sends messages, he does not wait, but returns directly and cannot receive the message delivery result.
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group001");
/ / specified namesrv
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("topic001"."oneway message".getBytes());
producer.sendOneway(message);
producer.shutdown();
System.out.println("Shut down");
}
Copy the code
Batch sending messages
RocketMQ allows multiple messages to be sent together to reduce network overhead and increase efficiency.
- Sending messages in batches requires that all messages must be the same topic and have the same message configuration
- Batch sending does not support delayed messages
- Sending messages in batches does not support retry
- The official recommendation is that a batch message should not exceed 1MB in size
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group001");
/ / specified namesrv
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 5; i++) {
Message message = new Message("topic001", ("collection message " + i).getBytes());
msgs.add(message);
}
SendResult sendResult = producer.send(msgs);
System.out.println("sendResult: " + sendResult);
producer.shutdown();
System.out.println("Shut down the producer");
}
Copy the code
The message filter
The TAG to filter
RocketMQ supports message filtering using tags. Within the same topic, subscribes to desired tags to filter unwanted messages.
-
When the producer sends a message, it sets the tag to message:
Message message = new Message("topic001"."TAG-A"."tag message".getBytes()); Copy the code
-
Subscribe to tag in Consumer:
// Subscribe topic, specify tag consumer.subscribe("topic001"."TAG-A||TAG-B"); // "*" indicates all messages Copy the code
SQL filter
In addition to tag filtering, RocketMQ also supports SQL92 filtering, which you need to enable in the configuration file broker.
enablePropertyFilter=true
Copy the code
The specified configuration file is then loaded when the Broker is started
../bin/mqbroker -n 192.168.1.6:9876 -c broker.conf
Copy the code
-
Set the userProperties parameter in Producer
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("group001"); / / set namesrvAddr producer.setNamesrvAddr("localhost:9876"); // Start the producer producer.start(); // Send a message for (int i = 0; i < 10; i++) { Message message = new Message("topic001"."TAG-A"."KEY-01", ("sql message " + i).getBytes()); message.putUserProperty("age", String.valueOf(i)); SendResult sendResult = producer.send(message); System.out.println("sendResult: " + sendResult); } / / close producer.shutdown(); System.out.println("Shut down the producer"); } Copy the code
-
Use SQL to filter messages in consumer
public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_sql"); / / specified namesrv consumer.setNamesrvAddr("localhost:9876"); / / subscribe to the topic // Create MessageSelector to filter messages based on SQL consumer.subscribe("topic001", MessageSelector.bySql("age >= 3 and age <= 7")); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { System.out.println("message: " + new String(messageExt.getBody())); System.out.println("messageExt: " + messageExt); } returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.println("consumer by sql started ..."); } Copy the code
grammar
RocketMQ defines only some basic syntax to support this functionality. You can also easily extend it.
- Numbers compare like
>
.> =
.<
.< =
.BETWEEN
.=
; - Character comparison, like
=
.<>
.IN
; IS NULL
orIS NOT NULL
;- Logical operations
AND
.OR
.NOT
;
Constant type
- Numbers, like 123, 3.1415;
- Strings, like ‘ABC’, must be in single quotes;
NULL
, special constant;- Boolean constant,
TRUE
或FALSE
;
Delay message
RocketMQ supports the use of messageDelayLevel to set delayed delivery messages.
The default configuration is:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Copy the code
This configuration item configures the delay time for each level starting from level 1.
It is possible to modify its configuration in broker.conf, but this is not recommended, as latency levels are also used in many places in the source code.
The supported time units are:
Unit of time | instructions |
---|---|
s | seconds |
m | points |
h | when |
d | day |
use
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group001");
/ / set namesrvAddr
producer.setNamesrvAddr("localhost:9876");
// Start the producer
producer.start();
// Send a delayed message
Message message = new Message("topic001"."TAG-A"."KEY-01"."delay message".getBytes());
// Default delay Settings
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(3); // Delay 10 seconds
SendResult sendResult = producer.send(message);
System.out.println("sendResult: " + sendResult);
/ / close
producer.shutdown();
System.out.println("Shut down the producer");
}
Copy the code
Order consumption
In RocketMQ, a topic should have multiple MessageQueue. Each MessageQueue is a queue, which supports the FIRO model, so it is only necessary to deliver messages to the same MessageQueue and consume them by a thread to ensure sequential consumption.
-
Concrete implementation – message delivery
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("group_orderly"); / / set namesrvAddr producer.setNamesrvAddr("localhost:9876"); // Start the producer producer.start(); for (int i = 0; i < 20; i++) { // Send a message Message message = new Message("topic_orderly"."TAG-A"."KEY-01", ("orderly message " + i).getBytes()); SendResult sendResult = producer.send( / / message message, // Message queue selector new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> list, Message message, Object o) { // All messages are sent to the first MessageQueue return list.get(0); }},// Customize parameters null); System.out.println("sendResult: " + sendResult); } / / close producer.shutdown(); System.out.println("Shut down the producer"); } Copy the code
-
Concrete implementation – message consumption
public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_orderly"); / / specified namesrv consumer.setNamesrvAddr("localhost:9876"); / / subscribe to the topic consumer.subscribe("topic_orderly"."*"); // Register the sequential consumption listener consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt messageExt : list) { System.out.println("message: " + new String(messageExt.getBody()) + " thread: " + Thread.currentThread().getName()); } returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("consumer by orderly started ..."); } Copy the code
Retry mechanism
Producer Sends retry
Default configuration:
public DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
this.log = ClientLogger.getLog();
this.createTopicKey = "TBW102";
this.defaultTopicQueueNums = 4;
// timeout for sending message
this.sendMsgTimeout = 3000;
this.compressMsgBodyOverHowmuch = 4096;
// Retry times of sending synchronization failures
this.retryTimesWhenSendFailed = 2;
// Retry times of asynchronous sending failures
this.retryTimesWhenSendAsyncFailed = 2;
// Whether to send requests to other brokers
this.retryAnotherBrokerWhenNotStoreOK = false;
this.maxMessageSize = 4194304;
this.traceDispatcher = null;
this.namespace = namespace;
this.producerGroup = producerGroup;
this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
Copy the code
Use:
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group_retry");
/ / set namesrvAddr
producer.setNamesrvAddr("localhost:9876");
// Send timeout. Default: 3000 ms
// this.sendMsgTimeout = 3000;
producer.setSendMsgTimeout(1000);
// Retry times of sending synchronization failures. The default value is 2
// this.retryTimesWhenSendFailed = 2;
producer.setRetryTimesWhenSendFailed(1);
// Retry times of asynchronous sending failures. The default value is 2
// this.retryTimesWhenSendAsyncFailed = 2;
producer.setRetryTimesWhenSendAsyncFailed(1);
// Whether to send to other brokers on failure. Default is false
// this.retryAnotherBrokerWhenNotStoreOK = false;
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
// Start the producer
producer.start();
// Send a message
Message message = new Message("topic_retry"."TAG-RETRY"."retry message".getBytes());
SendResult sendResult = producer.send(message);
System.out.println("timestamp: " + new Date() + " sendResult: " + sendResult);
/ / close
producer.shutdown();
System.out.println("Shut down the producer");
}
Copy the code
Consumer retry
Set the consumption timeout period
// Consumption timeout period, unit: minute, 15 minutes by default
// this.consumeTimeout = 15L;
consumer.setConsumeTimeout(10);
Copy the code
To retry consumption, return RECONSUME_LATER in the listener.
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("timestamp: " + new Date() + " messageId: " + messageExt.getMsgId() + " body: " + new String(messageExt.getBody()));
}
// CONSUME_SUCCESS
// RECONSUME_LATER; Try again later
returnConsumeConcurrentlyStatus.RECONSUME_LATER; }});Copy the code
Full consumer case:
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_retry");
/ / specified namesrv
consumer.setNamesrvAddr("localhost:9876");
// Consumption timeout period, unit: minute, 15 minutes by default
// this.consumeTimeout = 15L;
consumer.setConsumeTimeout(10);
/ / subscribe to the topic
consumer.subscribe("topic_retry"."TAG-RETRY");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("timestamp: " + new Date() + " messageId: " + messageExt.getMsgId() + " body: " + new String(messageExt.getBody()));
}
// CONSUME_SUCCESS
// RECONSUME_LATER; Try again later
returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}); consumer.start(); System.out.println("consumer by retry started ...");
}
Copy the code
Broker message retransmission
Only in MessageModel. CLUSTERING cluster mode, the broker for message back, MessageModel. The broker will not back – national BROADCASTING mode.
The recast interval uses the messageDelayLevel described above.
Transaction message
RocketMQ 4.3+ provides distributed transaction functionality, which enables the ultimate consistency of distributed transactions through RocketMQ transaction messages.
Transaction message logic:
Half Message: The producer sends a Half Message. Once the broker receives the Half Message, it stores the Message in the RMQ_SYS_TRANS_HALF_TOPIC Message consuming queue. After the Half Message is successfully sent, the local transaction begins.
** Check transaction status: ** The Broker starts a scheduled task to consume messages in the RMQ_SYS_TRANS_HALF_TOPIC queue, and each execution of the task confirms the transaction execution status (committed, rollback, unknown) to the message sender, and if unknown, waits for the next callback.
Timeout: The message is rolled back by default if the number of times the message is checked exceeds.
TransactionListener
-
executeLocalTransaction
After the half-message is successfully sent, this method is triggered to execute a local transaction.
-
checkLocalTransaction
Check the status of local transactions
Transaction message usage
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
TransactionMQProducer producer = new TransactionMQProducer("group_transaction");
/ / set namesrvAddr
producer.setNamesrvAddr("localhost:9876");
final CountDownLatch latch = new CountDownLatch(1);
// Set the transaction listener
producer.setTransactionListener(new TransactionListener() {
// Perform a local transaction
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("executeLocalTransaction...");
System.out.println("msg: " + new String(message.getBody()));
System.out.println("transactionId: " + message.getTransactionId());
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
// COMMIT_MESSAGE
// ROLLBACK_MESSAGE, rollback message
// UNKNOW; I'm not sure. Come back later
return LocalTransactionState.COMMIT_MESSAGE;
}
// Check whether the local transaction is complete
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("msg: " + new String(messageExt.getBody()));
System.out.println("transactionId: " + messageExt.getTransactionId());
if (latch.getCount() > 0) {
System.out.println("checkLocalTransaction: UNKNOW");
return LocalTransactionState.UNKNOW;
}
System.out.println("checkLocalTransaction: COMMIT_MESSAGE");
returnLocalTransactionState.COMMIT_MESSAGE; }});// Start the producer
producer.start();
Message message = new Message("topic_transaction"."TAG-A"."KEY-01"."transaction message".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println("sendResult: " + sendResult);
/ / close
producer.shutdown();
System.out.println("Shut down the producer");
}
Copy the code
conclusion
Links to the case source code used for all of the above:
Gitee.com/lzx946/codi…