sequence

This article focuses on Canal’s DirectLogFetcher

DirectLogFetcher

Canal – 1.1.4 / dbsync/SRC/main/Java/com/taobao/TDDL/dbsync binlog/DirectLogFetcher. Java

public final class DirectLogFetcher extends LogFetcher { protected static final Log logger = LogFactory.getLog(DirectLogFetcher.class); /** Command to dump binlog */ public static final byte COM_BINLOG_DUMP = 18; /** Packet header sizes */ public static final int NET_HEADER_SIZE = 4; public static final int SQLSTATE_LENGTH = 5; /** Packet offsets */ public static final int PACKET_LEN_OFFSET = 0; public static final int PACKET_SEQ_OFFSET = 3; /** Maximum packet length */ public static final int MAX_PACKET_LENGTH = (256 * 256 * 256 - 1); /** BINLOG_DUMP options */ public static final int BINLOG_DUMP_NON_BLOCK = 1; public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2; private Connection conn; private OutputStream mysqlOutput; private InputStream mysqlInput; / /... /** * Connect MySQL master to fetch binlog. */ public void open(Connection conn, String fileName, final long filePosition, final int serverId) throws IOException { open(conn, fileName, filePosition, serverId,false); } /** * Connect MySQL master to fetch binlog. */ public void open(Connection conn, String fileName, long filePosition, final int serverId, boolean nonBlocking) throws IOException { try { this.conn = conn; Class<? > connClazz = Class.forName("com.mysql.jdbc.ConnectionImpl");
            Object unwrapConn = unwrapConnection(conn, connClazz);
            if (unwrapConn == null) {
                throw new IOException("Unable to unwrap " + conn.getClass().getName()
                                      + " to com.mysql.jdbc.ConnectionImpl");
            }

            // Get underlying IO streams for network communications.
            Object connIo = getDeclaredField(unwrapConn, connClazz, "io");
            if (connIo == null) {
                throw new IOException("Get null field:" + conn.getClass().getName() + "#io");
            }
            mysqlOutput = (OutputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlOutput");
            mysqlInput = (InputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlInput");

            if (filePosition == 0) filePosition = BIN_LOG_HEADER_SIZE;
            sendBinlogDump(fileName, filePosition, serverId, nonBlocking);
            position = 0;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("Error on COM_BINLOG_DUMP: file = " + fileName + ", position = " + filePosition);
            throw e;
        } catch (ClassNotFoundException e) {
            close(); /* Do cleanup */
            throw new IOException("Unable to load com.mysql.jdbc.ConnectionImpl", e);
        }
    }

    public boolean fetch() throws IOException {
        try {
            // Fetching packet header from input.
            if(! fetch0(0, NET_HEADER_SIZE)) { logger.warn("Reached end of input stream while fetching header");
                return false;
            }

            // Fetching the first packet(may a multi-packet).
            int netlen = getUint24(PACKET_LEN_OFFSET);
            int netnum = getUint8(PACKET_SEQ_OFFSET);
            if(! fetch0(NET_HEADER_SIZE, netlen)) { logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
                return false;
            }

            // Detecting error code.
            final int mark = getUint8(NET_HEADER_SIZE);
            if(mark ! = 0) {if (mark == 255) // error from master
                {
                    // Indicates an error, for example trying to fetch from
                    // wrong
                    // binlog position.
                    position = NET_HEADER_SIZE + 1;
                    final int errno = getInt16();
                    String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
                    String errmsg = getFixString(limit - position);
                    throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
                                          + " errmsg = " + errmsg);
                } else if (mark == 254) {
                    // Indicates end of stream. It's not clear when this would
                    // be sent.
                    logger.warn("Received EOF packet from server, apparent" + " master disconnected.");
                    return false;
                } else {
                    // Should not happen.
                    throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum
                                          + ", len = " + netlen);
                }
            }

            // The first packet is a multi-packet, concatenate the packets.
            while (netlen == MAX_PACKET_LENGTH) {
                if (!fetch0(0, NET_HEADER_SIZE)) {
                    logger.warn("Reached end of input stream while fetching header");
                    return false;
                }

                netlen = getUint24(PACKET_LEN_OFFSET);
                netnum = getUint8(PACKET_SEQ_OFFSET);
                if (!fetch0(limit, netlen)) {
                    logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
                    return false;
                }
            }

            // Preparing buffer variables to decoding.
            origin = NET_HEADER_SIZE + 1;
            position = origin;
            limit -= origin;
            return true;
        } catch (SocketTimeoutException e) {
            close(); /* Do cleanup */
            logger.error("Socket timeout expired, closing connection", e);
            throw e;
        } catch (InterruptedIOException e) {
            close(); /* Do cleanup */
            logger.warn("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("I/O error while reading from client socket", e);
            throw e;
        }
    }

    private final boolean fetch0(final int off, final int len) throws IOException {
        ensureCapacity(off + len);

        for (int count, n = 0; n < len; n += count) {
            if (0 > (count = mysqlInput.read(buffer, off + n, len - n))) {
                // Reached end of input stream
                return false;
            }
        }

        if (limit < off + len) limit = off + len;
        return true;
    }

    public void close() throws IOException {
        try {
            if (conn != null) conn.close();

            conn = null;
            mysqlInput = null;
            mysqlOutput = null;
        } catch (SQLException e) {
            logger.warn("Unable to close connection", e);
        }
    }

    //......
}
Copy the code
  • DirectLogFetcher inherits LogFetcher; The open method unwrapConnection, then reflection to mysqlOutput, mysqlInput, and sendBinlogDump; Its close method closes connection; Its fetch method performs mysqlinput.read via fetch0, reads into buffer and returns true if it hasn’t read yet, and false if it has

sendBinlogDump

Canal – 1.1.4 / dbsync/SRC/main/Java/com/taobao/TDDL/dbsync binlog/DirectLogFetcher. Java

public final class DirectLogFetcher extends LogFetcher {

	//......

    protected final void sendBinlogDump(String fileName, final long filePosition, final int serverId,
                                        boolean nonBlocking) throws IOException {
        position = NET_HEADER_SIZE;

        putByte(COM_BINLOG_DUMP);
        putInt32(filePosition);
        int binlog_flags = nonBlocking ? BINLOG_DUMP_NON_BLOCK : 0;
        binlog_flags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
        putInt16(binlog_flags); // binlog_flags
        putInt32(serverId); // slave's server-id putString(fileName); final byte[] buf = buffer; final int len = position - NET_HEADER_SIZE; buf[0] = (byte) (len & 0xff); buf[1] = (byte) (len >>> 8); buf[2] = (byte) (len >>> 16); mysqlOutput.write(buffer, 0, position); mysqlOutput.flush(); } / /... }Copy the code
  • The sendBinlogDump method sends COM_BINLOG_DUMP, passing the binlog name, position, serverId, and binlog_flags(NonBlocking is set to BINLOG_DUMP_NON_BLOCK, otherwise 0)

DirectLogFetcherTest

Canal – 1.1.4 / dbsync/SRC/test/Java/com/taobao/TDDL/dbsync/binlog/DirectLogFetcherTest. Java

public class DirectLogFetcherTest extends BaseLogFetcherTest {

    @Test
    public void testSimple() {
        DirectLogFetcher fecther = new DirectLogFetcher();
        try {
            Class.forName("com.mysql.jdbc.Driver");
            Connection connection = DriverManager.getConnection("JDBC: mysql: / / 127.0.0.1:3306"."root"."hello");
            Statement statement = connection.createStatement();
            statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
            statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");

            fecther.open(connection, "mysql-bin.000007", 89797036L, 2);

            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
            LogContext context = new LogContext();
            while (fecther.fetch()) {
                LogEvent event = decoder.decode(fecther, context);
                int eventType = event.getHeader().getType();
                switch (eventType) {
                    case LogEvent.ROTATE_EVENT:
                        binlogFileName = ((RotateLogEvent) event).getFilename();
                        break;
                    case LogEvent.WRITE_ROWS_EVENT_V1:
                    case LogEvent.WRITE_ROWS_EVENT:
                        parseRowsEvent((WriteRowsLogEvent) event);
                        break;
                    case LogEvent.UPDATE_ROWS_EVENT_V1:
                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                    case LogEvent.UPDATE_ROWS_EVENT:
                        parseRowsEvent((UpdateRowsLogEvent) event);
                        break;
                    case LogEvent.DELETE_ROWS_EVENT_V1:
                    case LogEvent.DELETE_ROWS_EVENT:
                        parseRowsEvent((DeleteRowsLogEvent) event);
                        break;
                    case LogEvent.QUERY_EVENT:
                        parseQueryEvent((QueryLogEvent) event);
                        break;
                    case LogEvent.ROWS_QUERY_LOG_EVENT:
                        parseRowsQueryEvent((RowsQueryLogEvent) event);
                        break;
                    case LogEvent.ANNOTATE_ROWS_EVENT:
                        parseAnnotateRowsEvent((AnnotateRowsEvent) event);
                        break;
                    case LogEvent.XID_EVENT:
                        parseXidEvent((XidLogEvent) event);
                        break;
                    default:
                        break; } } } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } finally { try { fecther.close(); } catch (IOException e) { Assert.fail(e.getMessage()); }}}}Copy the code
  • DirectLogFetcherTest establishes a connection, executes fecther.open, and loops fecther.fetch() until it returns false. Decode (fecther, context) parse binlog data to LogEvent

summary

DirectLogFetcher inherits LogFetcher; The open method unwrapConnection, then reflection to mysqlOutput, mysqlInput, and sendBinlogDump; Its close method closes connection; Its fetch method performs mysqlinput.read via fetch0, reads into buffer and returns true if it hasn’t read yet, and false if it has

doc

  • DirectLogFetcher