This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.
In the RocketMQ configuration, there is a configuration item maxMessageSize that changes the size limits for sending and receiving messages, and here is a way to handle this without changing this configuration item.
In the RoketMQ document, an example of a large number of message cutting encoding method, but this method can only handle the total number of messages is greater than 4M but the single message is less than 4M, it is not suitable for the single message itself is greater than 4M, this paper discusses a cutting method in this case.
Realize the basic
For a single message cutting, after the cutting is completed, the producer sends the message to the consumer end, and the consumer end needs to perform message splicing and recovery. We use MessageBuilder to build the message, and then use rocketMQTemplate to send the message. Take asynchronous sending asyncSend as an example, inspect what kind of processing is done to the message before sending, to determine the preparation before cutting the message, the source code is as follows:
public static Message convertToRocketMessage(MessageConverter messageConverter, String charset, String destination, org.springframework.messaging.Message
message) {
Object payloadObj = message.getPayload();
byte[] payloads;
try {
if (null == payloadObj) {
throw new RuntimeException("the message cannot be empty");
}
if (payloadObj instanceof String) {
payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
payloads = (byte[]) ((byte[])message.getPayload());
} else {
String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format("empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]", messageConverter.getClass(), payloadObj.getClass(), payloadObj)); } payloads = jsonObj.getBytes(Charset.forName(charset)); }}catch (Exception var7) {
throw new RuntimeException("convert to RocketMQ message failed.", var7);
}
return getAndWrapMessage(destination, message.getHeaders(), payloads);
}
Copy the code
The above source code is a necessary step to process message payloads before sending them. As you can see, message payloads are eventually converted to byte [], so they should be converted to byte [] before cutting.
Cutting and sending
After clear cutting way, need to change the message into byte [], here with com. Alibaba. Fastjson. The JSON getBytes () method, cutting class in the reference documentation, build a suitable Iterator implementation:
public class SplitMessage implements Iterator<byte[] >{
// Cut the size
public final int SPLIT_SIZE = 1024 * 1024 * 4 - 80;
public final byte [] message;
private int cursor = 0;
public SplitMessage(byte [] message){
this.message = message;
}
@Override
public boolean hasNext(a){
return cursor < size();
}
@Override
public byte [] next(){
byte [] r;
int len;
if(cursor < size() - 1){
len = SPLIT_SIZE;
} else {
len = message.length - cursor * SPLIT_SIZE;
}
r = new byte[len];
for(int i = 0 ; i < len; i++){
r[i] = message[i + cursor * SPLIT_SIZE];
}
cursor++;
return r;
}
public int size(a){
int s = message.length / SPLIT_SIZE;
int y = message.length % SPLIT_SIZE;
if(y ! =0){
s++;
}
returns; }}Copy the code
The sending method can be
public String sendRefundExmMessage(Map<String, String> map, int id){
byte[] m = JSON.toJSONString(map).getBytes();
SplitMessage sm = new SplitMessage(m);
int len = sm.size();
int i = 0;
while (sm.hasNext()){
byte [] now = sm.next();
// Set the total length of the message header to the position of the message, for costumer end stitching
Message msg = MessageBuilder.withPayload(now).setHeader(MessageConst.PROPERTY_KEYS, id + "-" + i + "-" + len).build();
// Send it asynchronously
rocketMQTemplate.asyncSend("testTopic:tag", msg, new SendCallback(){
@Override
public void onSuccess(SendResult sendResult){
// Successfully processed
}
@Override
public void onException(Throwable throwable){
// Error handling}}); i++; }return "done";
}
Copy the code
Receiving and splicing
Since the message is sent asynchronously, each message arrives at the consumer end in a different order. Therefore, it needs to be cached and then determine whether all messages are received. Consumer can be written as follows:
@Component
@RocketMQMessageListener( consumerGroup = "test", topic = "testTopic", selectorExpression = "tag", messageModel = MessageModel.CLUSTERING, selectorType = SelectorType.TAG )
public class ContentConsumer implements RocketMQListener<MessageExt> {
// Inject the Redis service
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(MessageExt msg) {
String key = msg.getKeys();
String[] keyValues = key.split("-");
int id = new Integer(keyValues[0]).intValue();
boolean repeat = false;
try {
// Check whether messages in the cache are consumed
repeat = redisTemplate.opsForValue().get(id) == null ? false : (boolean) redisTemplate.opsForValue().get(id);
} catch (Exception e) {
// Error handling
}
if (repeat) {
// If already consumed, avoid repeated consumption
return;
} else {
// Get the current message fragment location
int cursor = new Integer(keyValues[1]).intValue();
// Get the total number of message fragments
int size = new Integer(keyValues[2]).intValue();
if (cursor < size - 1) {
// If the location is cached before the last item
redisTemplate.opsForValue().set(id + "-" + cursor, msg.getBody());
// Set the save time
redisTemplate.boundValueOps(id + "-" + cursor).expire(5L, TimeUnit.MINUTES);
} else {
// If it is the last fragment, proceed to the next step
byte[] all = new byte[0];
for (int i = 0; i < size - 1; i++) {
// Query Redis every 500 milliseconds to see if valuable messages are received
int max = 100;
while (redisTemplate.opsForValue().get(id + "-" + i) == null && max > 0) {
try {
Thread.sleep(500L);
max--;
} catch (InterruptedException e) {
/ / an error}}// If the number of messages is less than the total number after 50 seconds, an error is returned.
if (max == 0) return;
// Get the I message
byte[] temp = (byte[]) redisTemplate.opsForValue().get(id + "-" + i);
all = ByteBuffer.allocate(all.length + temp.length).put(all).put(temp).array();
redisTemplate.delete(id + "-" + i);
}
all = ByteBuffer.allocate(all.length + msg.getBody().length).put(all).put(msg.getBody()).array();
JSONObject json = JSON.parseObject(new String(all));
Map<String, String> map = (Map<String, String>) json;
// Message processing
// Set the message consumption success flag
redisTemplate.opsForValue().set(id, true);
redisTemplate.boundValueOps(id).expire(5L, TimeUnit.MINUTES); }}return; }}Copy the code
The main requirement of the above consumer side is to have a proper way to verify and concatenate messages. The method provided here is for reference only. This method can be extended to multiple server deployments, as long as one Redis service is shared.
This paper only discusses one possible solution. For large volume messages, not only the method discussed in this paper can be used, and the solution provided in this paper is not optimal.