sequence
This article focuses on RocketMQCanalConnector’s getFlatList
getFlatList
Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector. Java
public class RocketMQCanalConnector implements CanalMQConnector {
private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
private static final String CLOUD_ACCESS_CHANNEL = "cloud";
private String nameServer;
private String topic;
private String groupName;
private volatile boolean connected = false;
private DefaultMQPushConsumer rocketMQConsumer;
private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
private int batchSize = -1;
private long batchProcessTimeout = 60 * 1000;
private boolean flatMessage;
private volatile ConsumerBatchMessage lastGetBatchMessage = null;
private String accessKey;
private String secretKey;
private String customizedTraceTopic;
private boolean enableMessageTrace = false; private String accessChannel; private String namespace; / /... public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException { List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);if(messages ! = null && ! messages.isEmpty()) { ack(); }return messages;
}
public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
try {
if(this.lastGetBatchMessage ! = null) { throw new CanalClientException("mq get/ack not support concurrent & async ack");
}
ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
if(batchMessage ! = null) { this.lastGetBatchMessage = batchMessage;return batchMessage.getData();
}
} catch (InterruptedException ex) {
logger.warn("Get message timeout", ex);
throw new CanalClientException("Failed to fetch the data after: " + timeout);
}
return Lists.newArrayList();
}
public void ack() throws CanalClientException {
try {
if(this.lastGetBatchMessage ! = null) { this.lastGetBatchMessage.ack(); } } catch (Throwable e) {if(this.lastGetBatchMessage ! = null) { this.lastGetBatchMessage.fail(); } } finally { this.lastGetBatchMessage = null; }} / /... }Copy the code
- RocketMQCanalConnector’s getFlatList method gets the list of FlatMessages via getFlatListWithoutAck and then performs an ACK if messages are not empty; The getFlatListWithoutAck method pulls batchMessage from messageBlockingQueue. If not null, it updates lastGetBatchMessage and returns batchMessage.getData(); Lastgetbatchmessage.ack () or lastGetBatchmessage.fail () if an exception occurs
subscribe
Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector. Java
public class RocketMQCanalConnector implements CanalMQConnector {
//......
public synchronized void subscribe(String filter) throws CanalClientException {
if (connected) {
return;
}
try {
if (rocketMQConsumer == null) {
this.connect();
}
rocketMQConsumer.subscribe(this.topic, "*");
rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
boolean isSuccess = process(messageExts);
if (isSuccess) {
return ConsumeOrderlyStatus.SUCCESS;
} else {
returnConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; }}}); rocketMQConsumer.start(); } catch (MQClientException ex) { connected =false;
logger.error("Start RocketMQ consumer error", ex);
}
connected = true; } / /... }Copy the code
- The SUBSCRIBE method registers MessageListenerOrderly to rocketMQConsumer, whose consumeMessage method executes the Process method
process
Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector. Java
public class RocketMQCanalConnector implements CanalMQConnector {
//......
private boolean process(List<MessageExt> messageExts) {
if (logger.isDebugEnabled()) {
logger.debug("Get Message: {}", messageExts);
}
List messageList = Lists.newArrayList();
for (MessageExt messageExt : messageExts) {
byte[] data = messageExt.getBody();
if(data ! = null) { try {if(! flatMessage) { Message message = CanalMessageDeserializer.deserializer(data); messageList.add(message); }else {
FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
messageList.add(flatMessage);
}
} catch (Exception ex) {
logger.error("Add message error", ex); throw new CanalClientException(ex); }}else {
logger.warn("Received message data is null");
}
}
ConsumerBatchMessage batchMessage;
if(! flatMessage) { batchMessage = new ConsumerBatchMessage<Message>(messageList); }else {
batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
}
try {
messageBlockingQueue.put(batchMessage);
} catch (InterruptedException e) {
logger.error("Put message to queue error", e);
throw new RuntimeException(e);
}
boolean isCompleted;
try {
isCompleted = batchMessage.waitFinish(batchProcessTimeout);
} catch (InterruptedException e) {
logger.error("Interrupted when waiting messages to be finished.", e);
throw new RuntimeException(e);
}
boolean isSuccess = batchMessage.isSuccess();
returnisCompleted && isSuccess; } / /... }Copy the code
- The Process method converts a MessageExt to a Message or FlatMessage, and then assembles the ConsumerBatchMessage into a messageBlockingQueue
summary
RocketMQCanalConnector’s getFlatList method gets the list of FlatMessages via getFlatListWithoutAck and then performs an ACK if messages are not empty; The getFlatListWithoutAck method pulls batchMessage from messageBlockingQueue. If not null, it updates lastGetBatchMessage and returns batchMessage.getData(); Lastgetbatchmessage.ack () or lastGetBatchmessage.fail () if an exception occurs
doc
- RocketMQCanalConnector