This content is only in the trial phase, the feasibility is yet to be tested!

As I mentioned in the previous article, ali data subscription service DTS only puts binlogs into a single partition of the topic because binlogs need to be consumed sequentially, so subscribing to Kafka can only have one thread pulling messages from a single partition. The official DEMO uses the production-consumption mode to build the DTS binlog consumption framework, which allows consumers to have a blocking queue with a default size of 512. The producer stores messages to the consumer’s queue, and the consumer thread invokes listeners to consume messages through polling queues.

In the way the official DEMO works, only one thread can consume messages, and consuming binlogs involves accessing the database and synchronizing new data to the database /ES/ cache, all time-consuming I/O operations. This method is good as a local DEMO test and technical verification. If online service sampling method is used, it will only lead to a large number of Binlog delay consumption and message clustering phenomenon, and data synchronization will lose its real-time performance.

On the premise of ensuring Binlog sequential consumption, how can we achieve concurrent consumption and increase the speed of message consumption? This is a problem that we must solve, and the author has been thinking hard for a long time to find a perfect answer.

One producer – multiple consumers – one offset submitter

A compromise would be to change from a single producer to a single consumer, based on the official DEMO, to a single producer to multiple consumers, and consumers are no longer responsible for periodically submitting offsets (consumption offsets).

Problem number one, how do I solve sequential consumption of binlogs?

The answer I came up with was a consistent hash.

It seems feasible to submit binlogs that operate on the same table to the same consumer with a consistent hash, creating as many consumers as the entire application needs to listen on.

The implementation code is as follows:

int hash; if (realRecord.getObjectName() ! = null) {// The same table must be processed by the same EtlRecordProcessor // objectName = library name. Table name hash = math.abs (realRecord.getobJectName ().hashCode()); } else { hash = Math.abs(realRecord.getId().hashCode()); } // Select one from multiple consumers (the number of ETlRecordProcessors is fixed, No need to implement consistent hash) EtlRecordProcessor EtlRecordProcessor = recordProcessor[hash % recordProcessor.length]; // Offer back pressure while (! etlRecordProcessor.offer(200, TimeUnit.MILLISECONDS, userRecord) && ! existed) { }Copy the code

How to solve the problem of concurrent consumption offset submission?

On the premise that data synchronization can tolerate repeated message consumption, the message must be consumed at least once, so the submitted offset must be the smallest of all the records that have been consumed by the consumer, and only the smallest offset is submitted for each submission.

To achieve this, we need to abstract a “offset submitter “responsible for committing the minimum offset every 5 seconds at a regular time.

The “offset submitter “uses a queue to hold all offsets submitted by consumers (topic and partition), and the queue must support ordered inserts. The head of the queue is always the smallest offset, and the tail of the queue is always the largest offset.

The “offset submitter “is responsible for fetching the largest consecutive offset from the queue every 5 seconds and then committing it.

Suppose there are three consumers, and at some point, each consumer submits to the queue at offset:

A: 2223409, 2223415, 2223417 B: 2223410, 2223411, 2223416 C: 2223412, 2223413Copy the code

The offset queue stores the following elements:

2223409, 2223410, 2223411, 2223412, 2223413, 2223415, 2223416, 2223417Copy the code

The next time the offset is submitted, the rules for obtaining the actual offset to be submitted from the offset queue are as follows.

Start with the minimum, get 2223409, and move the pointer back to see if the next element is equal to the current element’s value +1, which means continuous. Keep moving the pointer backwards and find that 2223413 is not continuous with its next element, so the offset is 2223413. Remove 2223413 and the element before it from the queue. The final submitted offset is 2223413.

The implementation of the offset ordered queue is given at the end of the article.

The implementation code for the “offset submitter “is as follows:

public class AtomicOffsetCommitCallBack extends WorkThread.Work implements OffsetCommitCallBack {

    private final Context context;
    private volatile boolean existed = false;
    /** ** Use a sorted list to ensure that the minimum offset is committed each time, tolerate repeated consumption, but do not allow missing consumption */
    private LinkSortSupporAsync<Checkpoint> linkSort = new LinkSortSupporAsync<>();
    private volatile long lastCommitOffset = -1;

    public AtomicOffsetCommitCallBack(Context context) {
        this.context = context;
    }
 
    // Called by the consumer, just submit the offset to the list
    @Override
    public void commit(TopicPartition tp, long timestamp, long offset) {
        Checkpoint checkpoint = new Checkpoint(tp, timestamp, offset);
        linkSort.put(checkpoint);
    }

    @Override
    public void close(a) {
        existed = true;
    }

    @Override
    public void run(a) {
        while(! existed) { Util.sleepMs(5000); // Commit once every 5 seconds
            // Get the offset for this commit from the queue
            Checkpoint commitCheckpoint = linkSort.popSuccessiveMax(Integer.MAX_VALUE);
            if(commitCheckpoint ! =null && commitCheckpoint.getOffset() > lastCommitOffset) {
                if(commitCheckpoint.getTopicPartition() ! =null&& commitCheckpoint.getOffset() ! = -1) {
                    / / submit offset
                    context.getRecordGenerator().setToCommitCheckpoint(commitCheckpoint);
                    lastCommitOffset = commitCheckpoint.getOffset();
                }
            }
        }
    }

}
Copy the code

At the very least, this ensures that a large number of messages will not be skipped by the offset setting before they are consumed, which can be lost and never consumed during a reboot. However, this approach also leads to the possibility of a large number of messages being re-consumed during the restart.

You win some, you lose some. We can only control the maximum number of messages that can be re-consumed by adjusting the size of the message blocking queue held by each consumer. However, this value should not be too small to avoid blocking the pull thread because one consumer’s queue is full of messages and the other consumer’s queue is empty.

When every queue is nearly full, blocking queues block pull threads to reduce message production and achieve back pressure.

After solving the above two problems, the new model is as follows:

public class Context {
    /** * the producer */
    private RecordGenerator recordGenerator;
    /** * consumer */
    private EtlRecordProcessor[] recordProcessor;
    /** * offset submitter */
    private AtomicOffsetCommitCallBack offsetCommitCallBack;
}
Copy the code

Commit to Kafka using local storage offset+

In the previous article, I recommended not using a local file to store offset because the application would not find a file to store offset on a new machine.

However, Kafka Commit can fail if you consider network and other issues, so in the new release, we started to use both strategies, committing locally before committing to Kafka.

During the restart, if the local offset file exists, the last submitted offset file is read from the file first, and kafka is not pulled again. The code is as follows:

public class RecordGenerator implements Runnable.Closeable {

    private Checkpoint initAndGetCheckpoint(ConsumerWrap kafkaConsumerWrap) {
        // It is not recommended to use LocalFileMetaStore (especially when deployed on K8S), otherwise the localCheckpointStore file will need to be synchronized after the consumer is deployed on another server
        // You can choose to use both methods
        metaStoreCenter.registerStore(LOCAL_FILE_STORE_NAME, new LocalFileMetaStore(LOCAL_FILE_STORE_NAME));
        metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap));
        Checkpoint checkpoint = null;
        // Whether to use the configured checkpoint. If so, you must ensure that the correct consumption location is configured each time the application is started, otherwise repeated consumption will occur
        // It is recommended for testing only
        if (useCheckpointConfig.compareAndSet(true.false)) {
            log.info("RecordGenerator: force use initial checkpoint [{}] to start", checkpoint);
            checkpoint = initialCheckpoint;
        } else {
            // Retrieve checkpoints from checkpoint memory (since they are committed every 5 seconds, a small portion of records will be re-consumed each reboot, please ensure idempotency yourself)
            // Local checkpoints are preferred
            checkpoint = metaStoreCenter.seek(LOCAL_FILE_STORE_NAME, topicPartition, groupID);
            if (null == checkpoint) {
                // Use kafka checkpoints
                checkpoint = metaStoreCenter.seek(KAFKA_STORE_NAME, topicPartition, groupID);
            }
            // If no checkpoint is found, the configured initialization checkpoint is used
            if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
                checkpoint = initialCheckpoint;
                log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
            } else {
                log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint); }}returncheckpoint; }}Copy the code

Local asynchronous consumption

The consumer in the production-consumption model is not responsible for actually consuming the message, but for handing it over to multiple listeners for consumption.

For different scenarios, multiple listeners may be listening to the same table. In this case, multiple listeners can consume in parallel, but block until all listeners are consumed before submitting offset to the offset submitter.

Local multithreaded consumption is implemented as follows:

private List<Future<Void>> submitConsume(String table, BiglogOperation operation, FieldHolderMap<MysqlFieldHolder> fields) { List<TableOperationHandler> matchHandlers = handlerMap.get(table); if (CollectionUtils.isEmpty(matchHandlers)) { return Collections.emptyList(); } if (matchHandlers. Size () == 1) {// Consume handlers.get (0). Handle (operation, fields); return Collections.emptyList(); } List<Future<Void>> futures = new ArrayList<>(); for (TableOperationHandler handler : Add (executorService.submit(() -> handler.handle(operation, fields), null)); } return futures; }Copy the code

Implement a LinkSort that supports ordered inserts

This is a one-way linked list, where first always points to the head, tail always points to the tail, and mIDE changes with each insertion. Mide is used to locally optimize the insertion efficiency, which is equivalent to implementing a one-dimensional binary search.

/** * orderly insert queue * concurrency security **@author wujiuye 2020/11/27
 */
public class LinkSortSupporAsync<T extends Comparable<T>> {

    private class Node implements Comparable<Node> {
        private T obj;
        private Node next;

        public Node(T obj) {
            this.obj = obj;
        }
        / /... get set
    }

    private Node first;
    private Node tail;
    private Node mide;

    private int cap;
    private int size = 0;

    private Condition notFullCondition;
    private ReadWriteLock readWriteLock;

    public LinkSortSupporAsync(a) {
        this(Integer.MAX_VALUE);
    }

    public LinkSortSupporAsync(int cap) {
        this.cap = cap;
        this.readWriteLock = new ReentrantReadWriteLock();
        this.notFullCondition = this.readWriteLock.writeLock().newCondition();
    }

    public void put(T obj) {
        readWriteLock.writeLock().lock();
        try {
            while (cap <= size) {
                try {
                    notFullCondition.await();
                } catch (InterruptedException ignored) {
                }
            }
            size++;
            Node node = new Node(obj);
            if (first == null) {
                first = node;
                tail = first;
                return;
            }
            if (node.compareTo(tail) > 0) {
                tail.next = node;
                tail = node;
                return;
            }
            Node ptr = first;
            Node pre = null;
            if(mide ! =null && node.compareTo(mide) >= 0) {
                ptr = mide;
            }
            for(; ptr ! =null && node.compareTo(ptr) >= 0; pre = ptr, ptr = ptr.next) {
            }
            if (pre == null) {
                node.next = first;
                first = node;
            } else if (pre == tail) {
                pre.next = node;
                tail = node;
            } else{ mide = node; pre.next = node; node.next = ptr; }}finally{ readWriteLock.writeLock().unlock(); }}/** * ejects the largest continuous node from the queue and removes the node, including the previous node **@paramMaxNode Specifies the maximum number of nodes traversed to avoid queue blocking *@return* /
    public T popSuccessiveMax(int maxNode) {
        readWriteLock.writeLock().lock();
        try {
            if (first == null) {
                return null;
            }
            Node ptr = first;
            Node pre = null;
            int popCnt = 1;
            boolean isNumber = first.obj instanceof Number;
            for (int i = 0; ptr ! =null && i < maxNode; pre = ptr, ptr = ptr.next, i++) {
                if (pre == null) {
                    continue;
                }
                int cz;
                if (isNumber) {
                    cz = (int) (((Number) ptr.obj).longValue() - ((Number) pre.obj).longValue());
                } else {
                    cz = ptr.compareTo(pre);
                }
                // Make sure that the compareTo method implemented by T returns the difference between the two values
                if(cz ! =1) {
                    break;
                }
                popCnt++;
                if(mide == pre) { mide = ptr; }}if (pre == null) {
                first = ptr.next;
                size--;
                if (mide == ptr) {
                    mide = null;
                }
                notFullCondition.signal();
                return ptr.obj;
            }
            if (pre == tail) {
                first = tail = null;
                size = 0;
                mide = null;
                notFullCondition.signal();
                return pre.obj;
            }
            if (mide == ptr) {
                mide = null;
            }
            first = ptr;
            size -= popCnt;
            notFullCondition.signal();
            return pre.obj;
        } finally{ readWriteLock.writeLock().unlock(); }}}Copy the code