FlinkCDC released version 2.0.0 in August. Compared with version 1.0, FlinkCDC supports distributed read in full read phase, supports checkpoint, and ensures data consistency without locking tables in full + incremental read process. Refer to the official release of Flink CDC 2.0 for details of core improvements.
Flink CDC2.0 data reading logic is not complicated, but the design of flip-27: Refactor Source Interface and the ignorance of Debezium Api. This article focuses on the processing logic of Flink CDC, FLIP-27 design and Debezium API call will not be explained too much.
This paper first introduces the use of Flink CDC2.0 with Flink SQL case, then introduces the core design of CDC including slice division, slice reading and incremental reading, and finally explains the code of the call and implementation of flink-mysql-CDC interface in the process of data processing.
case
Full read + incremental read Mysql table data, write kafka in Changelog-JSON format, observe the RowKind type and the number of data items affected.
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); env.setParallelism(3); // Note: increment synchronization need to open CK env.enableCheckpointing(10000); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" + " `order_id` INTEGER ,\n" + " `order_date` DATE ,\n" + " `order_time` TIMESTAMP(3),\n" + " `quantity` INT ,\n" + " `product_id` INT ,\n" + " `purchaser` STRING,\n" + " primary key(order_id) NOT ENFORCED" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'cdc',\n" + " 'password' = '123456',\n" + " 'database-name' = 'test',\n" + " 'table - the name' = 'demo_orders', "+ / / full amount + incremental synchronization"' scan, startup mode '=' initial '" + ") "); tableEnvironment.executeSql("CREATE TABLE sink (\n" + " `order_id` INTEGER ,\n" + " `order_date` DATE ,\n" + " `order_time` TIMESTAMP(3),\n" + " `quantity` INT ,\n" + " `product_id` INT ,\n" + " `purchaser` STRING,\n" + " primary key (order_id) NOT ENFORCED " + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'topic' = 'mqTest02',\n" + " 'format' = 'changelog-json' "+ ")"); tableEnvironment.executeSql("insert into sink select * from demoOrders"); }Copy the code
Full data output:
{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12. 189 ", "quantity" : 53, "product_id" : 502, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09. 709 ", "quantity" : 31, "product_id" : 500, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06. 637 ", "quantity" : 69, "product_id" : 503, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03. 535 ", "quantity" : 52, "product_id" : 502, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51. 347 ", "quantity" : 69, "product_id" : 503, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48. 783 ", "quantity" : 50, "product_id" : 502, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32. 354 ", "quantity" : 30, "product_id" : 500, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01. 249 ", "quantity" : 31, "product_id" : 500, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58. 813 ", "quantity" : 69, "product_id" : 503, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56. 153 ", "quantity" : 50, "product_id" : 502, the "purchaser" : "flink"}, "op" : "+" I} {"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53. 727 ", "quantity" : 30, "product_id" : 500, the "purchaser" : "flink"}, "op" : "+" I} # # to update the value of 1005 {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58. 813 ", "quantity" : 69, "product_id" : 503, the "purchaser" : "flink"}, "op" : "-u"} {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43. 627 ", "quantity" : 80, "product_id" : 503, the "purchaser" : "flink"}, "op" : "+ U"} delete # # 1000 {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32. 354 ", "quantity" : 30, "product_id" : 500, the "purchaser" : "flink"}, "op" : "3-d"}Copy the code
The core design
Section division
In the full phase, data is read in distributed mode. The current table data is divided into multiple chunks based on primary keys, and the data in the Chunk range is read by sub-tasks. Table data is divided into evenly distributed chunks and non-evenly distributed chunks based on whether the primary key column is an incremented integer.
Uniform distribution
Primary key columns are incremented and of type integer (int, Bigint,decimal). The minimum value and maximum value of the primary key column are queried. Data is evenly divided according to the chunkSize size. As the primary key is an integer, the end position of chunk is directly calculated according to the start position of chunk and the size of chunkSize.
Select min(' order_id '), Max (' order_id ') from demo_orders; // Divide data into chunk-0: [min, start + chunkSize) chunk-1: [start + chunkSize, start + 2chunkSize)....... chunk-last: [Max, null)Copy the code
Nonuniform distribution
The primary key column is non-incremented or of type non-integer. The primary key type is non-numerical. The undivided data needs to be sorted in ascending order according to the primary key. The maximum chunkSize before the data is extracted is the end position of the current chunk.
// After sorting the unsplit data, take the chunkSize strip to the maximum value as the end position of the slice. chunkend = SELECT MAX(`order_id`) FROM ( SELECT `order_id` FROM `demo_orders` WHERE `order_id` > ORDER BY 'order_id' ASC LIMIT [chunkSize]) AS TCopy the code
Read full slice data
Flink divides the table data into multiple chunks, and sub-tasks read the Chunk data in parallel without locking. Because there is no lock in the whole process of data slice reading, other transactions may modify the data within the slice range, so data consistency cannot be guaranteed. Therefore, in the full phase, Flink uses the method of snapshot record reading +Binlog data correction to ensure data consistency.
The snapshot read
Perform SQL queries for slice-wide data records through JDBC.
SQL SELECT * FROM 'test'. 'demo_orders' WHERE order_id > = [chunkStart] AND NOT (order_id = [chunkEnd]) AND order_id < = [chunkEnd]Copy the code
Data correction
Before and after a snapshot is readSHOW MASTER STATUS
Query the current offset of a binlog file. After a snapshot is read, query the binlog data in the range and modify the snapshot records.
Snapshot read +Binlog Data organization structure during data read.
BinlogEvents Modifies the SnapshotEvents rule.
- If no binlog data is read, all snapshot records are delivered directly because no other transaction is performing operations during the SELECT phase.
- If binlog data is read and the changed data does not belong to the current slice, the snapshot record is delivered.
- Binlog data is read and the change in the data record belongs to the current slice. The delete operation removes the data from the snapshot memory, the INSERT operation adds new data to the snapshot memory, and the update operation adds change records to the snapshot memory. Finally, the two records before and after the update are output downstream.
Revised data organization structure:
Take reading the data in the range of slice [1,11] as an example to describe the processing process of slice data. C,d,u stand for adding, deleting, and updating operations captured by Debezium.
Data and structure before modification:
Revised data and structure:
After a single slice data is processed, the SplitEnumerator sends the start location (ChunkStart, ChunkStartEnd) and the maximum offset (High watermark) of Binlog to specify the start offset for incremental reads.
Incremental slice data read
In the full phase, after slice data is read, SplitEnumerator sends a BinlogSplit to read incremental data. The most important attribute for BinlogSplit reads is the start offset. If the offset is set too small, there may be duplicate data downstream; if the offset is set too large, there may be overdue dirty data downstream. The initial offset read by Flink CDC increment is the minimum Binlog offset of all completed full slices, and only the data that meets the conditions is sent downstream. Data delivery conditions:
- Offset of captured Binlog data > Maximum offset of the Binlog of the slice to which the data belongs.
For example, SplitEnumerator retains sliced information.
Slice indices
Chunk data range
The maximum Binlog that a slice reads
0
[1100]
1000
1
[101200]
800
2
[201300]
1500
In the incremental read mode, Binlog data is read from offset 800. When data <data:123, offset:1500 is captured, the snapshot fragment 123 belongs to is first found and the maximum Binlog offset 800 is found. If the current offset is greater than the maximum read offset of the snapshot, data is delivered. Otherwise, data is discarded.
Code,
FLIP-27: Refactor Source Interface design will not be introduced in detail, this paper focuses on the flink-mysql-CDC Interface invocation and implementation.
MySqlSourceEnumerator initialization
The SourceCoordinator is the implementation of the OperatorCoordinator to the Source running on the Master node. Create MySqlSourceEnumerator at startup by calling MySqlParallelSource#createEnumerator and call the start method to do some initialization.
-
Create MySqlSourceEnumerator, use MySqlHybridSplitAssigner to slice full + increment data, use MySqlValidator to check mysql version and configuration.
-
MySqlValidator check:
- The mysql version must be larger than 5.7.
- Binlog_format must be set to ROW.
- Binlog_row_image must be set to FULL.
-
MySqlSplitAssigner initializes:
- Create ChunkSplitter to divide slices.
- Filter out the table name to read.
-
Start the periodic scheduling thread and ask the SourceReader to send slice information to the SourceEnumerator for completed but unsent ACK events.
private void syncWithReaders(int[] subtaskIds, Throwable t) { if (t ! = null) { throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", t); } // when the SourceEnumerator restores or the communication failed between // SourceEnumerator and SourceReader, it may missed some notification event. // tell all SourceReader(s) to report there finished but unacked splits. if (splitAssigner.waitingForFinishedSplits()) { for (int subtaskId : subtaskIds) { // note: Send FinishedSnapshotSplitsRequestEvent context. SendEventToSourceReader (subtaskId, new FinishedSnapshotSplitsRequestEvent()); }}}Copy the code
MySqlSourceReader initialization
The SourceOperator integrates with the SourceReader and interacts through the OperatorEventGateway and the SourceCoordinator.
-
MySqlSourceReader is created with MySqlParallelSource when the SourceOperator is initialized. MySqlSourceReader create Fetcher pull through SingleThreadFetcherManager shard data, data is written to elementsQueue in MySqlRecords format.
MySqlParallelSource#createReader public SourceReader< T, MySqlSplit> CreateReader (SourceReaderContext readerContext) throws the Exception {/ / note: the data store queue FutureCompletingBlockingQueue< RecordsWithSplitIds< SourceRecord> > elementsQueue = new FutureCompletingBlockingQueue< > (a); final Configuration readerConfiguration = getReaderConfig(readerContext); // Note: Split Reader Supplier< MySqlSplitReader> splitReaderSupplier = () -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask()); return new MySqlSourceReader< > ( elementsQueue, splitReaderSupplier, new MySqlRecordEmitter< > (deserializationSchema), readerConfiguration, readerContext); }Copy the code
-
Pass the created MySqlSourceReader as an event to the SourceCoordinator for registration. The SourceCoordinator saves the Reader address and index after receiving the registration event.
SourceCoordinator#handleReaderRegistrationEvent // note: SourceCoordinator Reader register event private void handleReaderRegistrationEvent (ReaderRegistrationEvent event) { context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location())); enumerator.addReader(event.subtaskId()); }
-
When MySqlSourceReader is started, a request sharding event is sent to MySqlSourceEnumerator to collect allocated slice data.
-
After the SourceOperator is initialized, emitNext is called by SourceReaderBase to fetch the data set from elementsQueue and send it to MySqlRecordEmitter. Interface call schematic:!
MySqlSourceEnumerator handles sharding requests
MySqlSourceReader starts by sending a request RequestSplitEvent to MySqlSourceEnumerator to read interval data based on the returned slice range. MySqlSourceEnumerator Full read phase fragmentation request processing logic, finally returns a MySqlSnapshotSplit.
-
Handles slice request events, allocates slices for the requested Reader, and passes MySqlSplit(full phase MySqlSnapshotSplit, incremental phase MySqlBinlogSplit) by sending AddSplitEvent time.
MySqlSourceEnumerator#handleSplitRequest public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { if (! context.registeredReaders().containsKey(subtaskId)) { // reader failed between sending the request and now. skip this request. return; } // Note: Task-0 readersaWaitingsplit.add (subtaskId); readersaWaitingSplit.add (subtaskId); assignSplits(); } // note: private void assignmentin () {final Iterator< Integer> awaitingReader = readersAwaitingSplit.iterator(); while (awaitingReader.hasNext()) { int nextAwaiting = awaitingReader.next(); // if the reader that requested another split has failed in the meantime, remove // it from the list of waiting readers if (! context.registeredReaders().containsKey(nextAwaiting)) { awaitingReader.remove(); continue; } //note: assign slices by MySqlSplitAssigner to Optional< MySqlSplit> split = splitAssigner.getNext(); if (split.isPresent()) { final MySqlSplit mySqlSplit = split.get(); Context. assignSplit(mySqlSplit, nextAwaiting); awaitingReader.remove(); LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting); } else { // there is no available splits by now, skip assigning break; }}}Copy the code
-
MySqlHybridSplitAssigner processes the logic of full slice and incremental slice.
-
When the job starts, remainingTables is not empty, noMoreSplits return value is false, create SnapshotSplit.
-
After full phase sharding is read, noMoreSplits returns true, creating a BinlogSplit.
MySqlHybridSplitAssigner#getNext @Override public Optional< MySqlSplit> getNext() { if (snapshotSplitAssigner.noMoreSplits()) { // binlog split assigning if (isBinlogSplitAssigned) { // no more splits for the assigner return Optional.empty(); } else if (snapshotSplitAssigner.isFinished()) { // we need to wait snapshot-assigner to be finished before // assigning the binlog split. Otherwise, records emitted from binlog split // might be out-of-order in terms of same primary key with snapshot splits. isBinlogSplitAssigned = true; //note: snapshot split after the section is complete, create a BinlogSplit. return Optional.of(createBinlogSplit()); } else { // binlog split is not ready by now return Optional.empty(); } } else { // note: Founded by MySqlSnapshotSplitAssigner SnapshotSplit / / the snapshot assigner still have remaining splits. assign split from it return snapshotSplitAssigner.getNext(); }}Copy the code
-
-
MySqlSnapshotSplitAssigner processing full amount slicing logic, generated by ChunkSplitter slices, and stored in the Iterator.
@Override public Optional< MySqlSplit> getNext() { if (! remainingSplits.isEmpty()) { // return remaining splits firstly Iterator< MySqlSnapshotSplit> iterator = remainingSplits.iterator(); MySqlSnapshotSplit split = iterator.next(); iterator.remove(); //note: the assignedSplits are stored in our assignedmontes.put (split.splitid (), split); return Optional.of(split); } else {/ / note: initialization phase remainingTables store to read the table name TableId nextTable = remainingTables. PollFirst (); if (nextTable ! = null) { // split the given table into chunks (snapshot splits) // note: In the initialization phase, ChunkSplitter is created and generateSplits is called to partition Collection< slices. MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable); // Note: All information about remainingMontor.addall (Splits); / / note: have finished shard Table alreadyProcessedTables. Add (nextTable); Return getNext(); } else { return Optional.empty(); }}}Copy the code
4.ChunkSplitter divides the table into evenly distributed or unevenly distributed slices. The table read must contain a physical primary key.
public Collection< MySqlSnapshotSplit> generateSplits(TableId tableId) { Table schema = mySqlSchema.getTableSchema(tableId).getTable(); List< Column> primaryKeys = schema.primaryKeyColumns(); // note: Must have primary key if (primarykeys.isempty ()) {throw new ValidationException(string.format ("Incremental snapshot for tables ") requires primary key," + " but table %s doesn't have primary key.", tableId)); } // use first field in primary key as the split key Column splitColumn = primaryKeys.get(0); final List< ChunkRange> chunks; // note: chunks = splitTableIntoChunks(tableId, splitColumn); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); } //note: primary key data type conversion, ChunkRange wrapped as MySqlSnapshotSplit. // convert chunks into splits List< MySqlSnapshotSplit> splits = new ArrayList< > (a); RowType splitType = splitType(splitColumn); for (int i = 0; i < chunks.size(); i++) { ChunkRange chunk = chunks.get(i); MySqlSnapshotSplit split = createSnapshotSplit( tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd()); splits.add(split); } return splits; }Copy the code
-
SplitTableIntoChunks is divided into chunks based on the physical primary key.
private List< ChunkRange> splitTableIntoChunks(TableId tableId, Column splitColumn) throws SQLException { final String splitColumnName = splitColumn.name(); // select min, max final Object[] minMaxOfSplitColumn = queryMinMax(jdbc, tableId, splitColumnName); final Object min = minMaxOfSplitColumn[0]; final Object max = minMaxOfSplitColumn[1]; if (min == null || max == null || min.equals(max)) { // empty table, or only one row, return full table scan as a chunk return Collections.singletonList(ChunkRange.all()); } final List< ChunkRange> chunks; if (splitColumnEvenlyDistributed(splitColumn)) { // use evenly-sized chunks which is much efficient // note: Evenly divide chunks by the primary key = splitEvenlySizedChunks(min, Max); } else { // note: // Use uneven-sized chunks which will request many queries and is not efficient.chunks = splitUnevenlySizedChunks(tableId, splitColumnName, min, max); } return chunks; } /** Checks whether split column is evenly distributed across its range. */ private static boolean splitColumnEvenlyDistributed(Column splitColumn) { // only column is auto-incremental are recognized as evenly distributed. // TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future. if (splitColumn.isAutoIncremented()) { DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); // currently, we only support split column with type BIGINT, INT, DECIMAL return typeRoot == LogicalTypeRoot.BIGINT || typeRoot == LogicalTypeRoot.INTEGER || typeRoot == LogicalTypeRoot.DECIMAL; } else { return false; } /** * Split the table into evenly sized chunks according to the minimum and maximum value of the split, and scroll the chunks in {@link #chunkSize} step size. * Split table into evenly sized chunks based on the numeric min and max value of split column, * and tumble chunks in {@link #chunkSize} step size. */ private List< ChunkRange> splitEvenlySizedChunks(Object min, Object max) { if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) { // there is no more than one chunk, return full table as a chunk return Collections.singletonList(ChunkRange.all()); } final List< ChunkRange> splits = new ArrayList< > (a); Object chunkStart = null; Object chunkEnd = ObjectUtils.plus(min, chunkSize); // chunkEnd < = max while (ObjectUtils.compare(chunkEnd, max) < = 0) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); chunkStart = chunkEnd; chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); return splits; } /** Split the table into blocks of uneven size by continuously calculating the next maximum block size. * Split table into unevenly sized chunks by continuously calculating next chunk max value. */ private List< ChunkRange> splitUnevenlySizedChunks( TableId tableId, String splitColumnName, Object min, Object max) throws SQLException { final List< ChunkRange> splits = new ArrayList< > (a); Object chunkStart = null; Object chunkEnd = nextChunkEnd(min, tableId, splitColumnName, max); int count = 0; while (chunkEnd ! = null && ObjectUtils.compare(chunkEnd, max) < = 0) { // we start from [null, min + chunk_size) and avoid [null, min) splits.add(ChunkRange.of(chunkStart, chunkEnd)); // may sleep a while to avoid DDOS on MySQL server maySleep(count++); chunkStart = chunkEnd; chunkEnd = nextChunkEnd(chunkEnd, tableId, splitColumnName, max); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); return splits; } private Object nextChunkEnd( Object previousChunkEnd, TableId tableId, String splitColumnName, Object max) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); } if (ObjectUtils.compare(chunkEnd, max) >= 0) { return null; } else { return chunkEnd; } }Copy the code
MySqlSourceReader handles slice allocation requests
When MySqlSourceReader receives the slice allocation request, it creates a SplitFetcher thread first, adds to the taskQueue, and executes the AddSplitsTask task to handle the add fragment task. FetchTask is then executed to read the data using the Debezium API. The read data is stored in elementsQueue** **. SourceReaderBase gets the data from this queue and sends it to MySqlRecordEmitter.
-
When handling the slice allocation event, create SplitFetcher and add AddSplitsTask to the taskQueue.
SingleThreadFetcherManager#addSplits public void addSplits(List< SplitT> splitsToAdd) { SplitFetcher< E, SplitT> fetcher = getRunningFetcher(); if (fetcher == null) { fetcher = createSplitFetcher(); // Add the splits to the fetchers. fetcher.addSplits(splitsToAdd); startFetcher(fetcher); } else { fetcher.addSplits(splitsToAdd); } // create SplitFetcher protected synchronized SplitFetcher< E, SplitT> createSplitFetcher() { if (closed) { throw new IllegalStateException("The split fetcher manager has closed."); } // Create SplitReader. SplitReader< E, SplitT> splitReader = splitReaderFactory.get(); int fetcherId = fetcherIdGenerator.getAndIncrement(); SplitFetcher< E, SplitT> splitFetcher = new SplitFetcher< > ( fetcherId, elementsQueue, splitReader, errorHandler, () -> { fetchers.remove(fetcherId); elementsQueue.notifyAvailable(); }); fetchers.put(fetcherId, splitFetcher); return splitFetcher; } public void addSplits(List< SplitT> splitsToAdd) { enqueueTask(new AddSplitsTask< > (splitReader, splitsToAdd, assignedSplits)); wakeUp(true); }Copy the code
-
The SplitFetcher thread is executed. The AddSplitsTask thread is first executed to add shards, and the FetchTask thread is then executed to pull data.
SplitFetcher#runOnce void runOnce() { try { if (shouldRunFetchTask()) { runningTask = fetchTask; } else { runningTask = taskQueue.take(); } if (!wakeUp.get() && runningTask.run()) { LOG.debug("Finished running task {}", runningTask); runningTask = null; checkAndSetIdle(); } } catch (Exception e) { throw new RuntimeException( String.format( "SplitFetcher thread %d received unexpected exception while polling the records", id), e); } maybeEnqueueTask(runningTask); synchronized (wakeUp) { // Set the running task to null. It is necessary for the shutdown method to avoid // unnecessarily interrupt the running task. runningTask = null; // Set the wakeUp flag to false. wakeUp.set(false); LOG.debug("Cleaned wakeup flag."); } } Copy the code
-
AddSplitsTask calls the handleSplitsChanges method of MySqlSplitReader to add the allocated slice information to the slice queue. On the next fetch() call, the slice is fetched from the queue and the slice data is read.
AddSplitsTask#run public boolean run() { for (SplitT s : splitsToAdd) { assignedSplits.put(s.splitId(), s); } splitReader.handleSplitsChanges(new SplitsAddition< > (splitsToAdd)); return true; } MySqlSplitReader#handleSplitsChanges public void handleSplitsChanges(SplitsChange< MySqlSplit> splitsChanges) { if (! (splitsChanges instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format( "The SplitChange type of %s is not supported.", splitsChanges.getClass())); } //note: Add slices to the queue. splits.addAll(splitsChanges.splits()); }Copy the code
-
MySqlSplitReader performs fetch(), which is read by DebeziumReader into the event queue and returned as MySqlRecords after the data is corrected.
MySqlSplitReader#fetch @Override public RecordsWithSplitIds< SourceRecord> Fetch () throws IOException {// note: create Reader and read data checkSplitOrStartNext(); Iterator< SourceRecord> dataIt = null; Try {/ / note: to read data amended dataIt = currentReader. PollSplitRecords (); } catch (InterruptedException e) { LOG.warn("fetch data failed.", e); throw new IOException(e); } // note: return dataIt == null? finishedSnapshotSplit() : MySqlRecords.forRecords(currentSplitId, dataIt); } private void checkSplitOrStartNext() throws IOException { // the binlog reader should keep alive if (currentReader instanceof BinlogSplitReader) { return; } if (canAssignNextSplit()) {// Note: MySqlSplit final MySqlSplit nextSplit = mail.poll (); if (nextSplit == null) { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); // note: To distinguish the full amount of biopsy read or increment read the if (nextSplit. IsSnapshotSplit ()) {if (currentReader = = null) {final MySqlConnection jdbcConnection = getConnection(config); final BinaryLogClient binaryLogClient = getBinaryClient(config); final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(config, binaryLogClient, jdbcConnection); // note: Create SnapshotSplitReader, CurrentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId); } } else { // point from snapshot split to binlog split if (currentReader ! = null) { LOG.info("It's turn to read binlog split, close current snapshot reader"); currentReader.close(); } final MySqlConnection jdbcConnection = getConnection(config); final BinaryLogClient binaryLogClient = getBinaryClient(config); final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(config, binaryLogClient, jdbcConnection); LOG.info("Create binlog reader"); // note: CurrentReader = new BinlogSplitReader(statefulTaskContext, subtaskId); } / / note: perform Reader to read data currentReader. SubmitSplit (nextSplit); }}Copy the code
DebeziumReader data processing
DebeziumReader contains two stages: full slice reading and incremental slice reading. After reading the data, it is stored in ChangeEventQueue and modified when pollSplitRecords is executed.
-
SnapshotSplitReader Reads full slices. Data reading in the full stage queries the table data within the slice range by executing the Select statement, and writes the current offset before and after the SHOW MASTER STATUS is executed.
public void submitSplit(MySqlSplit mySqlSplit) { ...... executor.submit( () -> { try { currentTaskRunning = true; // 1. Execute snapshot read task final SnapshotSplitChangeEventSourceContextImpl sourceContext = new SnapshotSplitChangeEventSourceContextImpl(); SnapshotResult snapshotResult = splitSnapshotReadTask.execute(sourceContext); Final MySqlBinlogSplit appendBinlogSplit = createBinlogSplit(sourceContext); final MySqlOffsetContext mySqlOffsetContext = statefulTaskContext.getOffsetContext(); mySqlOffsetContext.setBinlogStartPoint( appendBinlogSplit.getStartingOffset().getFilename(), appendBinlogSplit.getStartingOffset().getPosition()); // note: Starting from the start offset read / / 2. The execute binlog read task if (snapshotResult. IsCompletedOrSkipped ()) {/ / we should only capture events for the current table, Configuration dezConf = statefulTaskContext .getDezConf() .edit() .with( "table.whitelist", currentSnapshotSplit.getTableId()) .build(); // task to read binlog for current split MySqlBinlogSplitReadTask splitBinlogReadTask = new MySqlBinlogSplitReadTask( new MySqlConnectorConfig(dezConf), mySqlOffsetContext, statefulTaskContext.getConnection(), statefulTaskContext.getDispatcher(), statefulTaskContext.getErrorHandler(), StatefulTaskContext.getClock(), statefulTaskContext.getTaskContext(), (MySqlStreamingChangeEventSourceMetrics) statefulTaskContext .getStreamingChangeEventSourceMetrics(), statefulTaskContext .getTopicSelector() .getPrimaryTopic(), appendBinlogSplit); splitBinlogReadTask.execute( new SnapshotBinlogSplitChangeEventSourceContextImpl()); } else { readException = new IllegalStateException( String.format( "Read snapshot for mysql split %s fail", currentSnapshotSplit)); } } catch (Exception e) { currentTaskRunning = false; LOG.error( String.format( "Execute snapshot read task for mysql split %s fail", currentSnapshotSplit), e); readException = e; }}); }Copy the code
-
SnapshotSplitReader Reads incremental slices. The focus of slice reading in the incremental phase is to determine when the BinlogSplitReadTask stops. The offset at the end of the slice reading phase is terminated.
MySqlBinlogSplitReadTask#handleEvent protected void handleEvent(Event event) { // note: Event delivery queue super.handleEvent(event); // note: // Check do we need to stop for read Binlog for snapshot split. If (isBoundedRead()) {final BinlogOffset currentBinlogOffset = new BinlogOffset( offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(), Long.parseLong( offsetContext .getOffset() .get(BINLOG_POSITION_OFFSET_KEY) .toString())); // note: currentBinlogOffset > // Reach the high watermark, the binlog reader should finished if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) { // send binlog end event try { signalEventDispatcher.dispatchWatermarkEvent( binlogSplit, currentBinlogOffset, SignalEventDispatcher.WatermarkKind.BINLOG_END); } catch (InterruptedException e) { logger.error("Send signal event error.", e); errorHandler.setProducerThrowable( new DebeziumException("Error processing binlog signal event", e)); } / / terminate the binlog read / / tell the reader the binlog task finished ((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished(); }}}Copy the code
-
SnapshotSplitReader Performs pollSplitRecords to correct the raw data in the queue. See RecordUtils#normalizedSplitRecords for the processing logic.
public Iterator< SourceRecord> pollSplitRecords() throws InterruptedException { if (hasNextElement.get()) { // data input: [low watermark event][snapshot events][high watermark event][binlogevents][binlog-end event] // data output: [low watermark event][normalized events][high watermark event] boolean reachBinlogEnd = false; final List< SourceRecord> sourceRecords = new ArrayList< > (a); while (! ReachBinlogEnd) {// Note: Process DataChangeEvent written to queue List< DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { sourceRecords.add(event.getRecord()); if (RecordUtils.isEndWatermarkEvent(event.getRecord())) { reachBinlogEnd = true; break; } } } // snapshot split return its data once hasNextElement.set(false); Return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster) .iterator(); } // the data has been polled, no more data reachEnd.compareAndSet(false, true); return null; }Copy the code
-
BinlogSplitReader Reads data. The reading logic is relatively simple, focusing on the setting of the starting offset, which is the HW of all slices.
-
BinlogSplitReader Performs pollSplitRecords to modify the original data in the queue to ensure data consistency. The increment Binlog read is unbounded, the data is all delivered to the event queue, and the BinlogSplitReader should check whether the data is delivered by shouldEmit ().
BinlogSplitReader#pollSplitRecords public Iterator< SourceRecord> pollSplitRecords() throws InterruptedException { checkReadException(); final List< SourceRecord> sourceRecords = new ArrayList< > (a); if (currentTaskRunning) { List< DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { if (shouldEmit(event.getRecord())) { sourceRecords.add(event.getRecord()); } } } return sourceRecords.iterator(); }Copy the code
Event delivery conditions: 1. The number of newly received Event Posts is greater than maxWM 2. If a Snapshot spilt & offset of the current data value is greater than HWM, data is delivered.
/** * * Returns the record should emit or not. * * The watermark signal algorithm is the binlog split reader only sends the binlog event that * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid * since the offset is after its high watermark. * * E.g: the data input is : * snapshot-split-0 info : [0, 1024) highWatermark0 * snapshot-split-1 info : [1024, 2048) highWatermark1 * the data output is: * only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send, * only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send. * */ private boolean shouldEmit(SourceRecord sourceRecord) { if (isDataChangeRecord(sourceRecord)) { TableId tableId = getTableId(sourceRecord); BinlogOffset position = getBinlogPosition(sourceRecord); // aligned, all snapshot splits of the table has reached max highWatermark // note: New event received post more than maxwm, directly issued the if (position. IsAtOrBefore (maxSplitHighWatermarkMap. Get (tableId))) {return true; } Object[] key = getSplitKey( currentBinlogSplit.getSplitKeyType(), sourceRecord, statefulTaskContext.getSchemaNameAdjuster()); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { /** * note: Current data points belong to a specific snapshot spilt & offset is greater than the HWM, distributed data * / if (RecordUtils. SplitKeyRangeContains (key, splitInfo getSplitStart (), splitInfo.getSplitEnd()) && position.isAtOrBefore(splitInfo.getHighWatermark())) { return true; } } // not in the monitored splits scope, do not emit return false; } // always send the schema change event and signal event // we need record them to state of Flink return true; }Copy the code
MySqlRecordEmitter delivers data
SourceReaderBase takes the set of DataChangeEvent data read by slices from the queue and converts the data type from Debezium’s DataChangeEvent to Flink’s RowData type.
-
SourceReaderBase processes sliced data flow.
org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext public InputStatus pollNext(ReaderOutput< T> output) throws Exception { // make sure we have a fetch we are working on, or move to the next RecordsWithSplitIds< E> recordsWithSplitId = this.currentFetch; if (recordsWithSplitId == null) { recordsWithSplitId = getNextFetch(output); if (recordsWithSplitId == null) { return trace(finishedOrAvailableLater()); } } // we need to loop here, because we may have to go across splits while (true) { // Process one record. // note: From the iterator by MySqlRecords. Read a single data record final E = recordsWithSplitId nextRecordFromSplit (); if (record ! = null) { // emit the record. recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); LOG.trace("Emitted record: {}", record); // We always emit MORE_AVAILABLE here, even though we do not strictly know whether // more is available. If nothing more is available, the next invocation will find // this out and return the correct status. // That means we emit the occasional 'false positive' for availability, but this // saves us doing checks for every record. Ultimately, this is cheaper. return trace(InputStatus.MORE_AVAILABLE); } else if (! moveToNextSplit(recordsWithSplitId, output)) { // The fetch is done and we just discovered that and have not emitted anything, yet. // We need to move to the next fetch. As a shortcut, we call pollNext() here again, // rather than emitting nothing and waiting for the caller to call us again. return pollNext(output); } // else fall through the loop } } private RecordsWithSplitIds< E> getNextFetch(final ReaderOutput< T> output) { splitFetcherManager.checkErrors(); LOG.trace("Getting next source data batch from queue"); // Note: Get data from elementsQueue final RecordsWithSplitIds< E> recordsWithSplitId = elementsQueue.poll(); if (recordsWithSplitId == null || ! moveToNextSplit(recordsWithSplitId, output)) { return null; } currentFetch = recordsWithSplitId; return recordsWithSplitId; }Copy the code
-
MySqlRecords returns a single data set.
com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit public SourceRecord nextRecordFromSplit() { final Iterator< SourceRecord> recordsForSplit = this.recordsForCurrentSplit; if (recordsForSplit ! = null) { if (recordsForSplit.hasNext()) { return recordsForSplit.next(); } else { return null; } } else { throw new IllegalStateException(); }}Copy the code
-
MySqlRecordEmitter through RowDataDebeziumDeserializeSchema converts data Rowdata.
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord public void emitRecord(SourceRecord element, SourceOutput< T> output, MySqlSplitState splitState) throws Exception { if (isWatermarkEvent(element)) { BinlogOffset watermark = getWatermark(element); if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setHighWatermark(watermark); } } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) { HistoryRecord historyRecord = getHistoryRecord(element); Array tableChanges = historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES); TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); for (TableChanges.TableChange tableChange : changes) { splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange); } } else if (isDataChangeRecord(element)) { // note: Data processing of the if (splitState isBinlogSplitState ()) {BinlogOffset position = getBinlogPosition (element); splitState.asBinlogSplitState().setStartingOffset(position); } debeziumDeserializationSchema.deserialize( element, new Collector< T> () { @Override public void collect(final T t) { output.collect(t); } @Override public void close() { // do nothing } }); } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); }}Copy the code
RowDataDebeziumDeserializeSchema serialization process.
com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(before);
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
}
}
Copy the code
MySqlSourceReader reports the slice read completion event
After the MySqlSourceReader has processed a full slice, it sends the completed slice information, including the slice ID, HighWatermar, to the MySqlSourceEnumerator, and then continues to send the slice request.
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished protected void onSplitFinished(Map< String, MySqlSplitState> finishedSplitIds) { for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) { MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit(); finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); } / * * * note: section of send complete event * / reportFinishedSnapshotSplitsIfNeed (); // continue to send slice requests context.sendsplitRequest () after the previous spilt processing is complete; } private void reportFinishedSnapshotSplitsIfNeed() { if (! finishedUnackedSplits.isEmpty()) { final Map< String, BinlogOffset> finishedOffsets = new HashMap< > (a); for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) { // note: Send the slice ID and the maximum offset finishedoffsets.put (split.splitid (), split.gethighwatermark ()); } FinishedSnapshotSplitsReportEvent reportEvent = new FinishedSnapshotSplitsReportEvent(finishedOffsets); context.sendSourceEventToCoordinator(reportEvent); LOG.debug( "The subtask {} reports offsets of finished snapshot splits {}.", subtaskId, finishedOffsets); }}Copy the code
MySqlSourceEnumerator allocates incremental slices
In the full stage, after all slices are read, MySqlHybridSplitAssigner creates a BinlogSplit for subsequent incremental reads. In the creation of BinlogSplit, the minimum BinlogOffset is selected from all completed full slices. Note: the 2.0.0 createBinlogSplit minimum offset always starts at 0. The latest master branch has fixed this BUG.
private MySqlBinlogSplit createBinlogSplit() { final List< MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values().stream() .sorted(Comparator.comparing(MySqlSplit::splitId)) .collect(Collectors.toList()); Map< String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); final List< FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList< > (a); final Map< TableId, TableChanges.TableChange> tableSchemas = new HashMap< > (a); BinlogOffset minBinlogOffset = null; For (MySqlSnapshotSplit split: MySqlSnapshotSplit split: MySqlSnapshotSplit assignedSnapshotSplit) { // find the min binlog offset BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId()); if (minBinlogOffset == null || binlogOffset.compareTo(minBinlogOffset) < 0) { minBinlogOffset = binlogOffset; } finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), binlogOffset)); tableSchemas.putAll(split.getTableSchemas()); } final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit(); return new MySqlBinlogSplit( BINLOG_SPLIT_ID, lastSnapshotSplit.getSplitKeyType(), minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset, BinlogOffset.NO_STOPPING_OFFSET, finishedSnapshotSplitInfos, tableSchemas); }Copy the code