sequence
This article focuses on Canal’s DirectLogFetcher
DirectLogFetcher
Canal – 1.1.4 / dbsync/SRC/main/Java/com/taobao/TDDL/dbsync binlog/DirectLogFetcher. Java
public final class DirectLogFetcher extends LogFetcher { protected static final Log logger = LogFactory.getLog(DirectLogFetcher.class); /** Command to dump binlog */ public static final byte COM_BINLOG_DUMP = 18; /** Packet header sizes */ public static final int NET_HEADER_SIZE = 4; public static final int SQLSTATE_LENGTH = 5; /** Packet offsets */ public static final int PACKET_LEN_OFFSET = 0; public static final int PACKET_SEQ_OFFSET = 3; /** Maximum packet length */ public static final int MAX_PACKET_LENGTH = (256 * 256 * 256 - 1); /** BINLOG_DUMP options */ public static final int BINLOG_DUMP_NON_BLOCK = 1; public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2; private Connection conn; private OutputStream mysqlOutput; private InputStream mysqlInput; / /... /** * Connect MySQL master to fetch binlog. */ public void open(Connection conn, String fileName, final long filePosition, final int serverId) throws IOException { open(conn, fileName, filePosition, serverId,false); } /** * Connect MySQL master to fetch binlog. */ public void open(Connection conn, String fileName, long filePosition, final int serverId, boolean nonBlocking) throws IOException { try { this.conn = conn; Class<? > connClazz = Class.forName("com.mysql.jdbc.ConnectionImpl");
Object unwrapConn = unwrapConnection(conn, connClazz);
if (unwrapConn == null) {
throw new IOException("Unable to unwrap " + conn.getClass().getName()
+ " to com.mysql.jdbc.ConnectionImpl");
}
// Get underlying IO streams for network communications.
Object connIo = getDeclaredField(unwrapConn, connClazz, "io");
if (connIo == null) {
throw new IOException("Get null field:" + conn.getClass().getName() + "#io");
}
mysqlOutput = (OutputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlOutput");
mysqlInput = (InputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlInput");
if (filePosition == 0) filePosition = BIN_LOG_HEADER_SIZE;
sendBinlogDump(fileName, filePosition, serverId, nonBlocking);
position = 0;
} catch (IOException e) {
close(); /* Do cleanup */
logger.error("Error on COM_BINLOG_DUMP: file = " + fileName + ", position = " + filePosition);
throw e;
} catch (ClassNotFoundException e) {
close(); /* Do cleanup */
throw new IOException("Unable to load com.mysql.jdbc.ConnectionImpl", e);
}
}
public boolean fetch() throws IOException {
try {
// Fetching packet header from input.
if(! fetch0(0, NET_HEADER_SIZE)) { logger.warn("Reached end of input stream while fetching header");
return false;
}
// Fetching the first packet(may a multi-packet).
int netlen = getUint24(PACKET_LEN_OFFSET);
int netnum = getUint8(PACKET_SEQ_OFFSET);
if(! fetch0(NET_HEADER_SIZE, netlen)) { logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
return false;
}
// Detecting error code.
final int mark = getUint8(NET_HEADER_SIZE);
if(mark ! = 0) {if (mark == 255) // error from master
{
// Indicates an error, for example trying to fetch from
// wrong
// binlog position.
position = NET_HEADER_SIZE + 1;
final int errno = getInt16();
String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
String errmsg = getFixString(limit - position);
throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
+ " errmsg = " + errmsg);
} else if (mark == 254) {
// Indicates end of stream. It's not clear when this would
// be sent.
logger.warn("Received EOF packet from server, apparent" + " master disconnected.");
return false;
} else {
// Should not happen.
throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum
+ ", len = " + netlen);
}
}
// The first packet is a multi-packet, concatenate the packets.
while (netlen == MAX_PACKET_LENGTH) {
if (!fetch0(0, NET_HEADER_SIZE)) {
logger.warn("Reached end of input stream while fetching header");
return false;
}
netlen = getUint24(PACKET_LEN_OFFSET);
netnum = getUint8(PACKET_SEQ_OFFSET);
if (!fetch0(limit, netlen)) {
logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
return false;
}
}
// Preparing buffer variables to decoding.
origin = NET_HEADER_SIZE + 1;
position = origin;
limit -= origin;
return true;
} catch (SocketTimeoutException e) {
close(); /* Do cleanup */
logger.error("Socket timeout expired, closing connection", e);
throw e;
} catch (InterruptedIOException e) {
close(); /* Do cleanup */
logger.warn("I/O interrupted while reading from client socket", e);
throw e;
} catch (IOException e) {
close(); /* Do cleanup */
logger.error("I/O error while reading from client socket", e);
throw e;
}
}
private final boolean fetch0(final int off, final int len) throws IOException {
ensureCapacity(off + len);
for (int count, n = 0; n < len; n += count) {
if (0 > (count = mysqlInput.read(buffer, off + n, len - n))) {
// Reached end of input stream
return false;
}
}
if (limit < off + len) limit = off + len;
return true;
}
public void close() throws IOException {
try {
if (conn != null) conn.close();
conn = null;
mysqlInput = null;
mysqlOutput = null;
} catch (SQLException e) {
logger.warn("Unable to close connection", e);
}
}
//......
}
Copy the code
- DirectLogFetcher inherits LogFetcher; The open method unwrapConnection, then reflection to mysqlOutput, mysqlInput, and sendBinlogDump; Its close method closes connection; Its fetch method performs mysqlinput.read via fetch0, reads into buffer and returns true if it hasn’t read yet, and false if it has
sendBinlogDump
Canal – 1.1.4 / dbsync/SRC/main/Java/com/taobao/TDDL/dbsync binlog/DirectLogFetcher. Java
public final class DirectLogFetcher extends LogFetcher {
//......
protected final void sendBinlogDump(String fileName, final long filePosition, final int serverId,
boolean nonBlocking) throws IOException {
position = NET_HEADER_SIZE;
putByte(COM_BINLOG_DUMP);
putInt32(filePosition);
int binlog_flags = nonBlocking ? BINLOG_DUMP_NON_BLOCK : 0;
binlog_flags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
putInt16(binlog_flags); // binlog_flags
putInt32(serverId); // slave's server-id putString(fileName); final byte[] buf = buffer; final int len = position - NET_HEADER_SIZE; buf[0] = (byte) (len & 0xff); buf[1] = (byte) (len >>> 8); buf[2] = (byte) (len >>> 16); mysqlOutput.write(buffer, 0, position); mysqlOutput.flush(); } / /... }Copy the code
- The sendBinlogDump method sends COM_BINLOG_DUMP, passing the binlog name, position, serverId, and binlog_flags(
NonBlocking is set to BINLOG_DUMP_NON_BLOCK, otherwise 0
)
DirectLogFetcherTest
Canal – 1.1.4 / dbsync/SRC/test/Java/com/taobao/TDDL/dbsync/binlog/DirectLogFetcherTest. Java
public class DirectLogFetcherTest extends BaseLogFetcherTest {
@Test
public void testSimple() {
DirectLogFetcher fecther = new DirectLogFetcher();
try {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("JDBC: mysql: / / 127.0.0.1:3306"."root"."hello");
Statement statement = connection.createStatement();
statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
fecther.open(connection, "mysql-bin.000007", 89797036L, 2);
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
while (fecther.fetch()) {
LogEvent event = decoder.decode(fecther, context);
int eventType = event.getHeader().getType();
switch (eventType) {
case LogEvent.ROTATE_EVENT:
binlogFileName = ((RotateLogEvent) event).getFilename();
break;
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
parseRowsEvent((WriteRowsLogEvent) event);
break;
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
case LogEvent.UPDATE_ROWS_EVENT:
parseRowsEvent((UpdateRowsLogEvent) event);
break;
case LogEvent.DELETE_ROWS_EVENT_V1:
case LogEvent.DELETE_ROWS_EVENT:
parseRowsEvent((DeleteRowsLogEvent) event);
break;
case LogEvent.QUERY_EVENT:
parseQueryEvent((QueryLogEvent) event);
break;
case LogEvent.ROWS_QUERY_LOG_EVENT:
parseRowsQueryEvent((RowsQueryLogEvent) event);
break;
case LogEvent.ANNOTATE_ROWS_EVENT:
parseAnnotateRowsEvent((AnnotateRowsEvent) event);
break;
case LogEvent.XID_EVENT:
parseXidEvent((XidLogEvent) event);
break;
default:
break; } } } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } finally { try { fecther.close(); } catch (IOException e) { Assert.fail(e.getMessage()); }}}}Copy the code
- DirectLogFetcherTest establishes a connection, executes fecther.open, and loops fecther.fetch() until it returns false. Decode (fecther, context) parse binlog data to LogEvent
summary
DirectLogFetcher inherits LogFetcher; The open method unwrapConnection, then reflection to mysqlOutput, mysqlInput, and sendBinlogDump; Its close method closes connection; Its fetch method performs mysqlinput.read via fetch0, reads into buffer and returns true if it hasn’t read yet, and false if it has
doc
- DirectLogFetcher