This article takes a deep look at the EventParser component from three aspects.

  • Read the official documentation for the design of EventParser
  • What you learn internally from the Initialization of EventParser are configurable items
  • See how EventParser works from its startup

Warm tip: this article is long, if the patience to read will have a good harvest, in order to improve the reading experience, all the source code is through screenshots, you can focus on reading the corresponding text, and at the end of the summary.

See EventParser for official documentation

The architecture of EventParser is as follows:



The diagram above illustrates the overall workflow flow of EventParser. The key steps are as follows:

  1. Retrieves the last resolved Log location from the Log Position manager.
  2. Send a BINLOG_DUMP request to the Mysql Master node.
  3. The Mysql Master node starts to push binlog logs to the Slave node from the log point imported from the Slave node.
  4. Slave Receives binlog logs and calls BinlogParser to parse them.
  5. Pass parsed structured data to the EventSink component.
  6. Log the binlog resolution periodically so that incremental subscriptions can continue after the restart.
  7. One of the HA features listed in the figure above is that if the Master that needs to be synchronized goes down, it can continue to synchronize binlog logs from its other slave nodes, avoiding a single point of failure.

The official documentation is helpful for understanding the implementation of the EventParser component, but there is little detail on how to use EventParser, so I’ll take a look back at the features of EventParser from a source point of view and explain how it works. To guide us on how to use EventParser better.

EventParser initialization

As you can see from the previous article, the EventParser component is one of the four core components of CanalInstance, and the story of this section begins with the initEventParser method of CanalInstanceWithManager.

CanalInstanceWithManager#initEventParser



Step1: get the connection information of the database, the above code is the basic operation of the collection, but from the above code can spy on how to configure the address information of the database.

You can configure the address of the canal Instance database in the following ways: CanalParamter

  • Single-library scenario Configuration Mode 1: CanalParmeter provides masterAddress, masterUsername, masterPassword, standbyAddress, standbyUsername, standbyPassword 6 Property to specify information about the master and slave libraries, configured for the purpose of providing HA.
  • Single-library configuration mode 2: List dbAddresses provided by CanalParamter are configured. The first element in the set is the address of the master library, the second element is the address of the secondary library, and the database user name can be configured using dbUsername and dbPassword.
  • Multi-database scenario: CanalParmeter provides the List< List< DataSourcing>> groupDbAddresses attribute to set the mysql group, such as the mysql database and table. The first element of groupDbAddresses is the master library’s address list, and the second element is the slave library’s address list.

Tips: The user name and password are used for binlog synchronization on the corresponding server.

For details about the configuration in the multi-library scenario, see the following:



The corresponding initialization code is as follows:



CanalInstanceWithManager#initEventParser



Step2: Build an EventParser instance based on the configured MySQL. Here are a few key points:

  • GroupEventParser is created if the MySQL address is configured as a group, and an EventParser list is maintained internally.
  • Create an EventParser instance by calling the doInitEventParser method.

Next we’ll focus on the implementation details of doInitEventParser.

CanalInstanceWithManager#doInitEventParser



Step3: It can be seen from here that Canal currently does not support Oracle data, but only MySQL and local binlog files (directly parsed according to the binlog file).

Warm tip: Next, the discussion will focus on MySQL binlog, and will ignore the RDS, TSDB and other database auxiliary support related to Ali Cloud, only related to the open source MySQL related processing logic.

CanalInstanceWithManager#doInitEventParser



MysqlEventParser = MysqlEventParser = MysqlEventParser;

  • Destination Canal Instance Name.
  • ConnectionCharset Character set. The specified byte data will be converted using this encoding level when binlog is parsed. The default is UTF-8.
  • ConnectionCharsetNumber Specifies the numeric representation of the character set. UTF8 is 33, which is used in the protocol package used to interact with MySQL. Canal does not handle this well. Set by connectionCharset linkage.
  • DefaultConnectionTimeoutInSeconds MySQL default connection timeout time, because of the Canal will disguise as Slave node of the MySQL server, need to send the request to the MySQL Master, so the need to create links, This is the default supermarket time for creating the connection, which defaults to 30s.
  • SendBufferSize is used to cache the sending end of the network channel. Currently, the implementation classes of the network channel in Canal are BioSocketChannelPool and NettySocketChannelPool. From the point of view of the code, this parameter does not take effect at present. That is, use the default values of the operating system.
  • ReceiveBufferSize is used to forget the channel receiveBufferSize. Currently the same as sendBufferSize parameter does not take effect.
  • DetectingEnable Whether to enable heartbeat detection. It is enabled by default.
  • DetectingSQL heartbeat detection statements, such as SELECT 1, show master status, etc.
  • DetectingIntervalInSeconds heartbeat interval detection, defaults to 3 s.
  • SlaveId Specifies the ID of the secondary server. It cannot be the same in the same MySQL replication group.

CanalInstanceWithManager#doInitEventParser



Step4: if the List positions attribute of CanalPrameter is set, resolve it into an EntryPosition entity. Let’s see how to represent the location information of the binlog.



Its main core parameters are as follows:

  • Long timestamp Specifies the timestamp used to indicate the position
  • String journalName Name of a binlog file, for example, mysql-bin.000001.
  • Long position uses an offset to represent a specific location.
  • Long serverId Sets the ID of the master.
  • String gTID Global transaction ID.

Tips: As a practical guide, the List< String> positions of CanalParameter does not support group mode and can only be set in one group. That is, the first element is the main element and the second element can be a slave node. This attribute is optional.

CanalInstanceWithManager#doInitEventParser



Step5: Continue to set parameters and see the meaning of each parameter in detail:

  • FallbackIntervalInSeconds if MySQL master node downtime, Canal support to switch to its synchronization binlog from nodes on the log, but for the sake of the integrity of the data, can set a fallback time, which will cause repeated data issued, but try not to lose, this value defaults to the 60 s.
  • ProfilingEnabled Whether to enable performance collection. ProfilingEnabled Collects the time consumed by a batch of logs processed by the EventSink component until they are stored in the EventStore.
  • FilterTableError Whether to ignore table filtering exceptions. The default is false. Table filtering will be covered in future articles.
  • Parallel parsing, Canal access Prometheus collects monitoring data concurrently. The default value is false.
  • IsGTIDMode Whether to enable the GTID mode.

CanalInstanceWithManager#doInitEventParser



Step6: continue to fill in parsers related parameters, and its key implementation is as follows:

  • TransactionSize Canal provides a mechanism to try to process all the change logs in a database transaction at the same time. This is the length of the cache used to process cached transaction logs. The default value is 1024.
  • LogPositionManager initializes the log point manager, and Canal provides a memory-based, ZooKeeper, and memory-Zookeepr hybrid log point manager, which will be described in more detail later.
  • AviaterRegexFilter provides a regular expression based on aviater to filter table names.
  • BlackFilter Canal provides blacklist configuration and provides blacklist regular expressions to filter table names.

CanalInstanceWithManager#doInitEventParser



Step7: if the parser is a MySQL parser and HA mechanism is provided, that is, if the MySQL Master breaks down, Canal can actively switch to the MySQL Slave node and continue to synchronize binlog logs.

3. Detailed explanation of EventParser workflow

In this section, we will introduce the process of initializing EventParser. The code entry is the start method of EventParser. This article will focus on MySQL binlog parsing, so its implementation class is: MysqlEventParser.

MysqlEventParser’s start method is as follows:



The primary call is to the start method of its parent class. Let’s look at it in more detail.

AbstractEventParser#start



Step1: Create a ring cache. After parsing the binlog, Canal will try to submit all change logs (all change data of a transaction) generated by a database transaction to the EventSink component as a whole. Thus, Canal consumers can synchronize all data of one transaction at a time, ensuring data integrity.

Tips: About ring buffer zone specific implementation details will be described in detail below, here talk simple first Canal is currently not guarantee one hundred percent of a transaction data must be a consumer, if a transaction resulting change log more than the capacity of the circular buffer area, will be forced to submit consumption, a transaction data could be separated consumption, The default length of the loop cache is 1024.

AbstractEventParser#start



Step2: build a binlog parser. This method is an abstract method in AbstractEventParser. The concrete implementation is in its subclass. AbstractMysqlEventParser, the implementation class of MySQL binlog parsing is LogEventConvert and the module is parse. This part is the core of the whole Canal EventParser and will be introduced in detail in subsequent articles.



AbstractEventParser#start



Step3: Start an independent thread to be responsible for binlog parsing. The thread contains Canal Instance destination, address and other information, which is convenient to use JStack to diagnose binlog parsing related problems. The next step is to explore the binlog parsing process by reading the thread’s run method.

AbstractEventParser#start



Step31: Canal creates a TCP connection using an account that has permission to replicate the library. For example, if the database instance 192.168.1.166:3306 needs to be synchronized, Canal creates a TCP connection using an account that has permission to replicate the library. This paper will not introduce the implementation details here in detail. It represents a field, that is, it is necessary to know MySQL communication protocol, establish a connection with MySQL through TCP, and send commands according to MySQL communication protocol, such as select, dump and other requests. After learning Canal and other core components, You’ll probably learn more about this section, but I’ll highlight a few key aspects of its implementation:

  • Firstly, a TCP connection is created to connect to the MySQL server. Canal provides BIO and Netty implementations.
  • TCP After the TCP connection is successfully established after the three-way handshake, you need to shake hands with MySQL to complete protocol agreements and client login verification. For example, for the handshake implementation code, see MysqlConnector Negotiate.
  • In a nutshell, MySqlConnection’s job is to implement a MySQL client. Its effect is equivalent to the implementation of our common SQL connection client, on this aspect of the programming is actually not difficult, if you want to become a database middleware technical personnel, just according to the official documentation of MySQL about the communication protocol.

AbstractEventParser#start



Step32: Send heartbeat packet, the key implementation points are as follows:

  • Using the Timer timing schedule, heartbeat packets sent by detectingIntervalInSeconds between specified.
  • A HEARTBEAT packet is constructed to create a CanalEntry.Entry whose type is entrytype. HEARTBEAT. Instead of sending the heartbeat packet to the remote MySQL server, the Entry is delivered to the EventSink component.
  • The intention of the heartbeat package cooperation, here to leave a foreshadowing, the subsequent article will be revealed.

AbstractEventParser#start



Step33: Run the dump command to receive binlog logs from the MySQL server. The preparations are as follows:

  • First, create a dedicated database connection, mainly used to find some configuration information of MySQL, collectively called metadata.
  • Send the show variables like ‘binlog_format ‘STATEMENT to the MySQL server to query the binlog format configured on the server. MySQL supports STATEMENT, ROW, and MIXED modes.
  • Send the show variables like ‘binlog_row_image’ statement to the MySQL server to query the binlog_row_image configured by the server.

Binlog_row_image binlog_row_image binlog_row_image binlog_row_image

Binlog_row_image controls the way in which binlog events are recorded when binlog_format is set to ROW. Binlog is used to record data changes. For example, in an UPDATE request, a ROW of data before and after the change is recorded. In the binlog event, before and after are used to record the data before and after, respectively. However, there is a question: Is it to record only the values of the changed fields before and after, or to record the values of all the fields in a row before and after the change? Therefore, binlog_row_image is introduced, which supports the following options:

  • Full: Records the values of all fields in before and after. For each field, update is used to indicate whether the field has changed. This option is the default.
  • Minimal: Records only the changed fields in before and after, and contains values that are unique to one line of data, such as primary keys.
  • Noblob: Records all column values in before and after, except BLOB and TEXT columns (if unchanged).

AbstractEventParser#start



Step34: Send the show variables like ‘server_id’ statement to the MySQL server to query the serverId configured on the server.

AbstractEventParser#start



Step35: Obtain the sites to be synchronized through the log site manager, which will be expanded in detail later.

AbstractEventParser#start



Step36: By sending dump requests to MySQL, receiving binlog logs from the server, and processing them, Canal supports parallelization of the process to improve performance. The Parallel attribute enables concurrency, thereby introducing disruptor’s high-performance concurrency framework. The details will be explained in detail in subsequent articles.

AbstractEventParser#start



Step37: Receive the log returned by the MySQL server and parse it into the canal. Entry object and transmit it to the EventSink component.

The preceding process is repeated to parse binlog logs and synchronize data.

In this paper, we first learned about EventParser based on the official documents. However, Canal’s official manual is not particularly detailed. Therefore, we need to deduce the parameters of EventParser in Canal Instance through the source code, what are the meanings of these parameters, and how they work.

As we all know, EventParser’s main job is to “deal” with the MySQL server, masquerading as a slave node of the MySQL server, receiving binlog logs from the server and decoding the binary stream into Canal.entry. However, it is still difficult to achieve. The following aspects are worthy of further study and discussion:

  1. The use and techniques of the ring cache.
  2. Implement MySQL communication protocol, send related SQL statements to MySQL and parse the return result \, specifically by MysqlConnection object.
  3. Log resolution site management mechanism.
  4. Locate the binlog log mode based on the GTID and log point offset.
  5. Dump command delivery, High-performance design (introduction of Disruptor Framework)

Due to the length of this paper, the above knowledge points only so far, will be further discussed as needed.


Well, that’s all for this article. Your likes and retweets are the biggest encouragement for me to continue to output high-quality articles.