sequence
This paper mainly studies the BinlogReader of Debezium
Reader
Debezium – v1.1.1. Final/debezium – connector – mysql/SRC/main/Java/IO/debezium/connector/mysql/Reader. Java
public interface Reader {
public static enum State {
/**
* The reader is stopped and static.
*/
STOPPED,
/**
* The reader is running and generated records.
*/
RUNNING,
/**
* The reader has completed its work or been explicitly stopped, but not all of the generated records have been
* consumed via {@link Reader#poll() polling}.
*/
STOPPING;
}
public String name();
public State state();
public void uponCompletion(Runnable handler);
public default void initialize() {
// do nothing
}
public default void destroy() {
// do nothing
}
public void start();
public void stop();
public List<SourceRecord> poll() throws InterruptedException;
}
Copy the code
- The Reader interface defines the name, state, uponCompletion, start, stop, and poll methods
AbstractReader
Debezium – v1.1.1. Final/debezium – connector – mysql/SRC/main/Java/IO/debezium/connector/mysql/AbstractReader Java
public abstract class AbstractReader implements Reader {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final String name;
protected final MySqlTaskContext context;
protected final MySqlJdbcContext connectionContext;
private final BlockingQueue<SourceRecord> records;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicBoolean success = new AtomicBoolean(false);
private final AtomicReference<ConnectException> failure = new AtomicReference<>();
private ConnectException failureException;
private final int maxBatchSize;
private final Metronome metronome;
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final Duration pollInterval;
protected final ChangeEventQueueMetrics changeEventQueueMetrics;
private final HaltingPredicate acceptAndContinue;
public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) {
this.name = name;
this.context = context;
this.connectionContext = context.getConnectionContext();
this.records = new LinkedBlockingDeque<>(context.getConnectorConfig().getMaxQueueSize());
this.maxBatchSize = context.getConnectorConfig().getMaxBatchSize();
this.pollInterval = context.getConnectorConfig().getPollInterval();
this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
this.acceptAndContinue = acceptAndContinue == null ? new AcceptAllPredicate() : acceptAndContinue;
this.changeEventQueueMetrics = new ChangeEventQueueMetrics() {
@Override
public int totalCapacity() {
return context.getConnectorConfig().getMaxQueueSize();
}
@Override
public int remainingCapacity() {
returnrecords.remainingCapacity(); }}; } @Override public Stringname() {
return name;
}
@Override
public void uponCompletion(Runnable handler) {
assert this.uponCompletion.get() == null;
this.uponCompletion.set(handler);
}
@Override
public final void initialize() {
doInitialize();
}
@Override
public final void destroy() {
doDestroy();
}
@Override
public void start() {
if (this.running.compareAndSet(false.true)) {
this.failure.set(null);
this.success.set(false);
doStart();
}
}
@Override
public void stop() {
try {
// Emptying the queue so to make sure that enqueue() won't block indefinitely when adding records after // poll() isn't called anymore but before the binlog reader is stopped; note there's still a tiny chance for // this to happen if enough records are added again between here and the call to disconnect(); protecting // against it seems not worth though it as shouldn't happen for any practical queue size
List<SourceRecord> unsent = new ArrayList<>();
records.drainTo(unsent);
logger.info("Discarding {} unsent record(s) due to the connector shutting down", unsent.size());
doStop();
running.set(false);
}
finally {
if(failure.get() ! = null) { // We had a failure and it was propagated via poll(), afterwhich Kafka Connect will stop
// the connector, which will stop the task that will then stop this reader via this method.
// Since no more records will ever be polled again, we know we can clean up this reader's resources... doCleanup(); } } } @Override public State state() { if (success.get() || failure.get() ! = null) { // We've either completed successfully or have failed, but either way no more records will be returned ...
return State.STOPPED;
}
if (running.get()) {
return State.RUNNING;
}
// Otherwise, we're in the process of stopping ... return State.STOPPING; } @Override public List
poll() throws InterruptedException { // Before we do anything else, determine if there was a failure and throw that exception ... failureException = this.failure.get(); if (failureException ! = null) { // In this case, we'
ll throw the exception and the Kafka Connect worker or EmbeddedEngine
// will then explicitly stop the connector task. Most likely, however, the reader that threw
// the exception will have already stopped itself and will generate no additional records.
// Regardless, there may be records on the queue that will never be consumed.
throw failureException;
}
// this reader has been stopped before it reached the success or failed end state, so clean up and abort
if(! running.get()) { cleanupResources(); throw new InterruptedException("Reader was stopped while polling");
}
logger.trace("Polling for next batch of records");
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
while(running.get() && (records.drainTo(batch, maxBatchSize) == 0) && ! success.get()) { // No records are available even though the snapshot has not yet completed, so sleepfor a bit ...
metronome.pause();
// Check for failure after waking up ...
failureException = this.failure.get();
if(failureException ! = null) { throw failureException; }if (timeout.expired()) {
break; }}if (batch.isEmpty() && success.get() && records.isEmpty()) {
// We found no records but the operation completed successfully, so we're done this.running.set(false); cleanupResources(); return null; } pollComplete(batch); logger.trace("Completed batch of {} records", batch.size()); return batch; } @Override public String toString() { return name; } / /... }Copy the code
- The AbstractReader declaration implements the Reader interface and its poll method executes records. DrainTo (Batch, maxBatchSize) and pollComplete
BinlogReader
Debezium – v1.1.1. Final/debezium – connector – mysql/SRC/main/Java/IO/debezium/connector/mysql/BinlogReader Java
public class BinlogReader extends AbstractReader {
private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1);
private final boolean recordSchemaChangesInSourceRecords;
private final RecordMakers recordMakers;
private final SourceInfo source; private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class); private final BinaryLogClient client; / /... public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) { super(name, context, acceptAndContinue); connectionContext = context.getConnectionContext();source = context.source();
recordMakers = context.makeRecord();
recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords();
clock = context.getClock();
eventDeserializationFailureHandlingMode = connectionContext.eventProcessingFailureHandlingMode();
inconsistentSchemaHandlingMode = connectionContext.inconsistentSchemaHandlingMode();
// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
// Set up the logreader ... client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password()); // BinaryLogClient will overwrite thread names later client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(),"binlog-client".false));
client.setServerId(serverId);
client.setSSLMode(sslModeFor(connectionContext.sslMode()));
if (connectionContext.sslModeEnabled()) {
SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectionContext);
if(sslSocketFactory ! = null) { client.setSslSocketFactory(sslSocketFactory); } } client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE)); final long keepAliveInterval = context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS); client.setKeepAliveInterval(keepAliveInterval); // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor // multiply by keepAliveInterval andsetThe default value of heartbeatIntervalFactor // is 0.8, And we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from the MySQL server. client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor)); client.registerEventListener(context.bufferSizeForBinlogReader() == 0 ? this::handleEvent : (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add); client.registerLifecycleListener(new ReaderThreadLifecycleListener());if (logger.isDebugEnabled()) {
client.registerEventListener(this::logEvent); } / /... client.setEventDeserializer(eventDeserializer); // Set upfor JMX ...
metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics);
heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(),
context.getConnectorConfig().getLogicalName());
}
@Override
protected void doStart() {
context.dbSchema().assureNonEmptySchema();
// Register our event handlers ...
eventHandlers.put(EventType.STOP, this::handleServerStop);
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
// Conditionally register ROWS_QUERY handler to parse SQL statements.
if(context.includeSqlQuery()) { eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery); } final boolean isGtidModeEnabled = connectionContext.isGtidModeEnabled(); metrics.setIsGtidModeEnabled(isGtidModeEnabled); // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint. String availableServerGtidStr = connectionContext.knownGtidSet();if (isGtidModeEnabled) {
// The server is using GTIDs, so enable the handler ...
eventHandlers.put(EventType.GTID, this::handleGtidEvent);
// Now look at the GTID set from the server and what we've previously seen ... GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr); // also take into account purged GTID logs GtidSet purgedServerGtidSet = connectionContext.purgedGtidSet(); logger.info("GTID set purged on server: {}", purgedServerGtidSet); GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet); if (filteredGtidSet ! = null) { // We've seen at least some GTIDs, so start reading from the filtered GTID set. logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
String filteredGtidSetStr = filteredGtidSet.toString();
client.setGtidSet(filteredGtidSetStr);
source.setCompletedGtidSet(filteredGtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
}
else {
// We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning ... client.setBinlogFilename(source.binlogFilename()); client.setBinlogPosition(source.binlogPosition()); gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(""); } } else { // The server is not using GTIDs, so start reading the binlog based upon where we last left off ... client.setBinlogFilename(source.binlogFilename()); client.setBinlogPosition(source.binlogPosition()); } // We may be restarting in the middle of a transaction, so see how far into the transaction we have already processed... initialEventsToSkip = source.eventsToSkipUponRestart(); // Set the starting row number, which is the next row number to be read ... startingRowNumber = source.rowsToSkipUponRestart(); // Only when we reach the first BEGIN event will we start to skip events ... skipEvent = false; // Initial our poll output delay logic ... pollOutputDelay.hasElapsed(); previousOutputMillis = clock.currentTimeInMillis(); // Start the log reader, which starts background threads ... if (isRunning()) { long timeout = context.getConnectorConfig().getConnectionTimeout().toMillis(); long started = context.getClock().currentTimeInMillis(); try { logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout); client.connect(timeout); } catch (TimeoutException e) { // If the client thread is interrupted *before* the client could connect, the client throws a timeout exception // The only way we can distinguish this is if we get the timeout exception before the specified timeout has // elapsed, so we simply check this (within 10%) ... long duration = context.getClock().currentTimeInMillis() - started; If (duration > (0.9 * timeout)) {double actualSeconds = TimeUnit. MILLISECONDS. ToSeconds (duration); throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " + connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e); } // Otherwise, we were told to shutdown, so we don't care about the timeout exception
}
catch (AuthenticationException e) {
throw new ConnectException("Failed to authenticate to the MySQL database at " +
connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e);
}
catch (Throwable e) {
throw new ConnectException("Unable to connect to the MySQL database at " +
connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'." + e.getMessage(), e);
}
}
}
@Override
protected void doStop() {
try {
if (client.isConnected()) {
logger.debug("Stopping binlog reader '{}', last recorded offset: {}", this.name(), lastOffset);
client.disconnect();
}
cleanupResources();
}
catch (IOException e) {
logger.error("Unexpected error when disconnecting from the MySQL binary log reader '{}'", this.name(), e);
}
}
@Override
protected void pollComplete(List<SourceRecord> batch) {
// Record a bit about this batch ...
int batchSize = batch.size();
recordCounter += batchSize;
totalRecordCounter.addAndGet(batchSize);
if (batchSize > 0) {
SourceRecord lastRecord = batch.get(batchSize - 1);
lastOffset = lastRecord.sourceOffset();
if (pollOutputDelay.hasElapsed()) {
// We want to record the status ...
long millisSinceLastOutput = clock.currentTimeInMillis() - previousOutputMillis;
try {
if (logger.isInfoEnabled()) {
context.temporaryLoggingContext("binlog", () -> {
logger.info("{} records sent during previous {}, last recorded offset: {}", recordCounter, Strings.duration(millisSinceLastOutput), lastOffset); }); } } finally { recordCounter = 0; previousOutputMillis += millisSinceLastOutput; }}}} //...... }Copy the code
- The BinlogReader constructor creates BinaryLogClient and sets registerEventListener(AbstractReader).
handleEvent
) and eventDeserializer; Its doStart method initializes eventHandlers, sets gtidSet or binlogFilename and binlogPosition, and executes Client.connect (timeout); Its doStop method executes client.disconnect(); PollComplete updates metrics such as recordCounter and totalRecordCounter
summary
BinlogReader derives AbstractReader. Its constructor creates BinaryLogClient and sets the registerEventListener(handleEvent) and eventDeserializer. Its doStart method initializes eventHandlers, sets gtidSet or binlogFilename and binlogPosition, and executes Client.connect (timeout); Its doStop method executes client.disconnect(); PollComplete updates metrics such as recordCounter and totalRecordCounter
doc
- BinlogReader