sequence

This article focuses on RocketMQCanalConnector’s getFlatList

getFlatList

Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector. Java

public class RocketMQCanalConnector implements CanalMQConnector {

    private static final Logger                 logger               = LoggerFactory.getLogger(RocketMQCanalConnector.class);
    private static final String                 CLOUD_ACCESS_CHANNEL = "cloud";

    private String                              nameServer;
    private String                              topic;
    private String                              groupName;
    private volatile boolean                    connected           = false;
    private DefaultMQPushConsumer               rocketMQConsumer;
    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
    private int                                 batchSize           = -1;
    private long                                batchProcessTimeout = 60 * 1000;
    private boolean                             flatMessage;
    private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
    private String                              accessKey;
    private String                              secretKey;
    private String                              customizedTraceTopic;
    private boolean                             enableMessageTrace = false; private String accessChannel; private String namespace; / /... public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException { List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);if(messages ! = null && ! messages.isEmpty()) { ack(); }return messages;
    }

    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        try {
            if(this.lastGetBatchMessage ! = null) { throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }

            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
            if(batchMessage ! = null) { this.lastGetBatchMessage = batchMessage;return batchMessage.getData();
            }
        } catch (InterruptedException ex) {
            logger.warn("Get message timeout", ex);
            throw new CanalClientException("Failed to fetch the data after: " + timeout);
        }
        return Lists.newArrayList();
    }

    public void ack() throws CanalClientException {
        try {
            if(this.lastGetBatchMessage ! = null) { this.lastGetBatchMessage.ack(); } } catch (Throwable e) {if(this.lastGetBatchMessage ! = null) { this.lastGetBatchMessage.fail(); } } finally { this.lastGetBatchMessage = null; }} / /... }Copy the code
  • RocketMQCanalConnector’s getFlatList method gets the list of FlatMessages via getFlatListWithoutAck and then performs an ACK if messages are not empty; The getFlatListWithoutAck method pulls batchMessage from messageBlockingQueue. If not null, it updates lastGetBatchMessage and returns batchMessage.getData(); Lastgetbatchmessage.ack () or lastGetBatchmessage.fail () if an exception occurs

subscribe

Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector. Java

public class RocketMQCanalConnector implements CanalMQConnector {
	
	//......

    public synchronized void subscribe(String filter) throws CanalClientException {
        if (connected) {
            return;
        }
        try {
            if (rocketMQConsumer == null) {
                this.connect();
            }
            rocketMQConsumer.subscribe(this.topic, "*");
            rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {

                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    boolean isSuccess = process(messageExts);
                    if (isSuccess) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    } else {
                        returnConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; }}}); rocketMQConsumer.start(); } catch (MQClientException ex) { connected =false;
            logger.error("Start RocketMQ consumer error", ex);
        }
        connected = true; } / /... }Copy the code
  • The SUBSCRIBE method registers MessageListenerOrderly to rocketMQConsumer, whose consumeMessage method executes the Process method

process

Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector. Java

public class RocketMQCanalConnector implements CanalMQConnector {
	
	//......

    private boolean process(List<MessageExt> messageExts) {
        if (logger.isDebugEnabled()) {
            logger.debug("Get Message: {}", messageExts);
        }
        List messageList = Lists.newArrayList();
        for (MessageExt messageExt : messageExts) {
            byte[] data = messageExt.getBody();
            if(data ! = null) { try {if(! flatMessage) { Message message = CanalMessageDeserializer.deserializer(data); messageList.add(message); }else {
                        FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
                        messageList.add(flatMessage);
                    }
                } catch (Exception ex) {
                    logger.error("Add message error", ex); throw new CanalClientException(ex); }}else {
                logger.warn("Received message data is null");
            }
        }
        ConsumerBatchMessage batchMessage;
        if(! flatMessage) { batchMessage = new ConsumerBatchMessage<Message>(messageList); }else {
            batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
        }
        try {
            messageBlockingQueue.put(batchMessage);
        } catch (InterruptedException e) {
            logger.error("Put message to queue error", e);
            throw new RuntimeException(e);
        }
        boolean isCompleted;
        try {
            isCompleted = batchMessage.waitFinish(batchProcessTimeout);
        } catch (InterruptedException e) {
            logger.error("Interrupted when waiting messages to be finished.", e);
            throw new RuntimeException(e);
        }
        boolean isSuccess = batchMessage.isSuccess();
        returnisCompleted && isSuccess; } / /... }Copy the code
  • The Process method converts a MessageExt to a Message or FlatMessage, and then assembles the ConsumerBatchMessage into a messageBlockingQueue

summary

RocketMQCanalConnector’s getFlatList method gets the list of FlatMessages via getFlatListWithoutAck and then performs an ACK if messages are not empty; The getFlatListWithoutAck method pulls batchMessage from messageBlockingQueue. If not null, it updates lastGetBatchMessage and returns batchMessage.getData(); Lastgetbatchmessage.ack () or lastGetBatchmessage.fail () if an exception occurs

doc

  • RocketMQCanalConnector