Clear requirements
What I want to solve now is the out-of-order processing of mq messages during consumption.
A quick introduction to the image above. A message parses its own packet number through a message parser. If the difference between the serial number of the packet and that of the pre-packet in the parser is equal to 1 (the message is normal), the pre-packet is updated. The functionID of the message is obtained by its specific adaptive processor, and the specific handler is obtained for processing. If the serial number is less than 0, repeated message consumption occurs. The other is out of order. Out-of-order messages are delay-queued and sorted by message sequence number. And use a timer timed to fetch into the message parser for verification.
Code implementation
The message body
package com.fire.plan.mq;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class MessageContent implements Delayed {
Integer sourceId;// Different dirty message ids
Integer businessId;/ / business ID
String msg;// Message to be sent
String functionId;// Specific method to be handled
long createTimeMillis;// When the message was created
int count; // The number of queue entries
public int getCount(a) {
return count;
}
public void setCount(int count) {
this.count = count;
}
public long getCreateTimeMillis(a) {
return createTimeMillis;
}
public void setCreateTimeMillis(long createTimeMillis) {
this.createTimeMillis = createTimeMillis;
}
public static Builder builder(a){
return new Builder();
}
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
MessageContent messageContent = (MessageContent) o;
if (this.businessId > messageContent.businessId) {
return 1;
}else if (this.businessId < messageContent.businessId){
return -1;
}else{
return 0; }}static class Builder{
private Integer sourceId;// Different dirty message ids
private Integer businessId;/ / business ID
private String msg;// Message to be sent
private String functionId;
public Builder(a) {}public Builder sourceId(Integer sourceId) {
this.sourceId = sourceId;
return this;
}
public Builder businessId(Integer businessId) {
this.businessId = businessId;
return this;
}
public Builder msg(String msg) {
this.msg = msg;
return this;
}
public Builder functionId(String functionId) {
this.functionId = functionId;
return this;
}
public MessageContent build(a){
MessageContent messageContent = new MessageContent();
messageContent.sourceId = this.sourceId;
messageContent.businessId = this.businessId;
messageContent.msg = this.msg;
messageContent.functionId = this.functionId;
returnmessageContent; }}}Copy the code
Message handler
package com.fire.plan.mq;
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public interface MessageHandler {
void processMsg(MessageContent messageContent);
}
Copy the code
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class MessageHandlerOne implements MessageHandler{
@Override
public void processMsg(MessageContent messageContent) {
System.out.println(messageContent.businessId + " is being process"); }}Copy the code
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class MessageHandlerTwo implements MessageHandler{
@Override
public void processMsg(MessageContent messageContent) {
System.out.println(messageContent.businessId + " is being process"); }}Copy the code
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class MessageHandlerThree implements MessageHandler{
@Override
public void processMsg(MessageContent messageContent) {
System.out.println(messageContent.businessId + " is being process"); }}Copy the code
Message adapter
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public interface MessageAdaptive {
MessageHandler adapter(MessageContent messageContent);
}
Copy the code
package com.fire.plan.mq;
import java.util.HashMap;
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class MessageAdaptiveMq implements MessageAdaptive{
private HashMap<String, MessageHandler> handlerMapping = new HashMap<>();
public void registry(String functionId, MessageHandler messageHandler){
handlerMapping.put(functionId, messageHandler);
}
@Override
public MessageHandler adapter(MessageContent messageContent) {
returnhandlerMapping.get(messageContent.functionId); }}Copy the code
Message parser
package com.fire.plan.mq;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class MessageSolver {
private static volatile MessageSolver messageSolver = null;
Integer preBusinessId;
DelayQueue<MessageContent> delayed;
MessageAdaptive messageAdaptive;
long timeout = 6;
private MessageSolver(a){
this.delayed = new DelayQueue<>();
preBusinessId = 0;
TimerTask timerTask = new TimerTask() {
@Override
public void run(a) {
if(! delayed.isEmpty()) { MessageContent messageContent = delayed.poll();long currentTimeMillis = System.currentTimeMillis() / 1000;
long createTimeMillis = messageContent.createTimeMillis;
if (currentTimeMillis - createTimeMillis > timeout) {
// Store the message to db
System.out.println(messageContent.businessId + " will be store to db");
} else {
// Check the pre-packet againmessageSolver.messageSolver(messageContent); }}}}; Timer timer =new Timer();
//500ms Check the pre-packet once
timer.schedule(timerTask, 0 ,500);
}
public static MessageSolver getMessageSolver(a){
if (messageSolver == null) {
synchronized (MessageSolver.class) {
if (messageSolver == null) {
messageSolver = newMessageSolver(); }}}return messageSolver;
}
public void messageSolver(MessageContent messageContent){
int flag = messageContent.businessId - messageSolver.preBusinessId;
if (flag == 1) {
MessageHandler adapter = messageAdaptive.adapter(messageContent);
adapter.processMsg(messageContent);
// Update the pre-packet number
this.preBusinessId = messageContent.businessId;
}else if (flag <= 0){
System.out.println("this message has been consumed");
}else {
if (messageContent.getCount() == 0) messageContent.setCreateTimeMillis(System.currentTimeMillis() / 1000);
messageContent.setCount(messageContent.getCount() + 1); delayed.add(messageContent); }}public long getTimeout(a) {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public MessageAdaptive getMessageAdaptive(a) {
return messageAdaptive;
}
public void setMessageAdaptive(MessageAdaptive messageAdaptive) {
this.messageAdaptive = messageAdaptive; }}Copy the code
test
package com.fire.plan.mq;
/ * * *@author zwd
* @date 2021/8/28
* @email [email protected]
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
MessageContent message1 = MessageContent.builder()
.sourceId(1).businessId(1).msg("first message").functionId("function01").build();
MessageContent message2 = MessageContent.builder()
.sourceId(1).businessId(2).msg("second message").functionId("function02").build();
MessageContent message3 = MessageContent.builder()
.sourceId(1).businessId(3).msg("third message").functionId("function03").build();
MessageAdaptiveMq messageAdaptiveMq = new MessageAdaptiveMq();
messageAdaptiveMq.registry("function01".new MessageHandlerOne());
messageAdaptiveMq.registry("function02".new MessageHandlerTwo());
messageAdaptiveMq.registry("function03".newMessageHandlerThree()); MessageSolver messageSolver = MessageSolver.getMessageSolver(); messageSolver.setMessageAdaptive(messageAdaptiveMq); messageSolver.messageSolver(message3); messageSolver.messageSolver(message1); messageSolver.messageSolver(message2); }}Copy the code
Output: 1 is being process 2 is being process 3 is being process