Liu is a second-year graduate student who is about to find a job. On the one hand, he wrote his blog to summarize the knowledge points of big data development, and on the other hand, he hoped to help his partners to make self-study from now on. Because Old Liu is self-taught big data development, there will certainly be some deficiencies in the blog, but also hope that everyone can criticize and correct, let us progress together!

background

Data sources in the field of big data include business database data, mobile embedded data and log data generated on the server. When we collect data, we can use different collection tools according to the different requirements of downstream data. Canal is a tool that synchronizes mysql incremental data.

  1. The concept of the Canal
  2. Principle of master/slave replication in mysql
  3. How does Canal synchronize data from MySQL
  4. Canal’s HA mechanism design
  5. A simple summary of various data synchronization solutions

Lao Liu is trying to use this article to get people to use Canal’s tool directly, and not spend other time learning it.

Implementation principle of mysql master/slave replication

Since Canal is used to synchronize incremental data in mysql, Lao Liu first talked about the principle of primary and secondary replication of mysql, and then talked about the core knowledge points of Canal.


According to this figure, Lao Liu decomposed the principle of master/slave replication of mysql into the following flow:

  1. The primary server must first start the binary log, binlog, which records any changes to the database data.
  2. The master server logs changes to the data in a binary binlog.
  3. The slave server copies the binary log from the master server to its local Relaylog. The slave server will start a worker I/O thread. The I/O thread will establish a normal single client connection with the master, and then start a special binlog dump thread on the master server. The binlog dump thread reads the events in the binary log on the primary server and then sends the binary events to the I/O thread and saves them to the relay log on the secondary server.
  4. The SQL thread is started from the server, the binary log is read from the secondary log, and another data modification operation is performed locally on the slave server to update the slave server data.

Mysql master/slave replication implementation principle is finished. After watching this process, can you guess Canal’s working principle?

The core knowledge of Canal

How Canal works

Canal’s working principle is that it simulates the interaction protocol of MySQL Slave, disguises itself as MySQL slave, and initiates the dump protocol to MySQL master. When the MySQL master receives the dump request, it will start pushing the binlog to Canal. Finally, Canal parses the binlog object.

Canal concept

Canal, which means “Canal” or “channel”, is used to synchronize incremental data (i.e. real-time data) in MySQL. It is an open source project developed purely in Java by Alibaba.

Canal architecture


Server represents a running instance of Canal, corresponding to a JVM. Instance corresponds to a data queue, and 1 Canal server corresponds to 1.. Submodules under n instance instance:

  1. EventParser: Data source access, simulates salve protocol interaction with master, and parses the protocol
  2. EventSink: Parser and Store link for data filtering, processing, and distribution
  3. EventStore: Data store
  4. MetaManager: Incremental subscription & consumption information manager

Now that we have covered the basic concepts of Canal, let’s talk about how Canal synchronizes mysql increments.

Canal synchronizes MySQL delta data

Open the mysql binlog

The premise of synchronizing mysql incremental data with Canal is that mysql binlog is enabled. Ali Cloud mysql database has binlog enabled by default. However, if we install mysql by ourselves, we need to manually enable the binlog log function.

Mysql > select * from mysql;

etc/my.cnf



server-id=1

log-bin=mysql-bin

binlog-format=ROW

Copy the code

Here is a knowledge point about the format of binlog, Lao Liu to tell you.

There are three formats of binlog: STATEMENT, ROW, and MIXED

  1. ROW mode (generally used)

    The log records the modified form of each row of data, not the context in which the SQL statement is executed. The log records only the data to be modified, which data is modified, and how it is modified. The log records only the value, and does not contain the association between multiple SQL tables.

    Advantages: It only needs to record which piece of data was changed and how it was changed, so its log content clearly records the details of each row of data change and is very easy to understand.

    Disadvantages: In ROW mode, especially when data is added, all statements executed are logged as changes per ROW, which generates a lot of log content.

  2. The STATEMENT model

    Every SQL statement that modifies the data is logged.

    Disadvantages: Since it is a recorded execution statement, in order for the statement to be executed correctly on the slave side, it must also record some information about the execution process of each statement, namely the context information, to ensure that all statements executed on the slave side get the same result as those executed on the master side.

    However, for example, step() cannot be copied correctly in some versions. If last-insert-id() is used in stored procedures, inconsistent ids may be obtained on the slave and master, that is, data inconsistency may occur, but not in ROW mode.

  3. MIXED mode

    Both modes are used.

Canal real-time synchronization

  1. First of all we want to configure the environment, in the conf/example/instance. Under the properties:
 ## mysql serverId

 canal.instance.mysql.slaveId = 1234

 #position info, which needs to be modified to its own database information

Canal. The instance. The master. The address = 127.0.0.1:3306

 canal.instance.master.journal.name =

 canal.instance.master.position =

 canal.instance.master.timestamp =

 #canal.instance.standby.address =

 #canal.instance.standby.journal.name =

 #canal.instance.standby.position =

 #canal.instance.standby.timestamp =

 #username/password, you need to change the database information to your own

 canal.instance.dbUsername = canal

 canal.instance.dbPassword = canal

 canal.instance.defaultDatabaseName =

 canal.instance.connectionCharset = UTF-8

 #table regex

canal.instance.filter.regex = .\*\\\\.. A \ *

Copy the code

Among them, the canal. The instance. ConnectionCharset represents the database corresponding to the types of coding in Java coding way, such as utf-8, GBK, ISO – 8859-1.

  1. Once configured, it’s time to start
 sh bin/startup.sh

To close the command, run bin/stop.sh

Copy the code
  1. Observe the log

    Log and example/example.log

  2. Start the client

    In IDEA business code, mysql if there is incremental data will be pulled over, printed in IDEA console

    Add to the pom.xml file:

 <dependency>

   <groupId>com.alibaba.otter</groupId>

   <artifactId>canal.client</artifactId>

The < version > 1.0.12 < / version >

 </dependency>

Copy the code

Add client code:

public class Demo {

 public static void main(String[] args) {

// Create a connection

     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),

             "example".""."");

     connector.connect();

/ / subscribe

     connector.subscribe();

     connector.rollback();

     int batchSize = 1000;

     int emptyCount = 0;

     int totalEmptyCount = 100;

     while (totalEmptyCount > emptyCount) {

         Message msg = connector.getWithoutAck(batchSize);

         long id = msg.getId();

         List<CanalEntry.Entry> entries = msg.getEntries();

         if(id == -1 || entries.size() == 0){

             emptyCount++;

             System.out.println("emptyCount : " + emptyCount);

             try {

                 Thread.sleep(3000);

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }else{

             emptyCount = 0;

             printEntry(entries);

         }

         connector.ack(id);

     }

 }

 // batch -> entries -> rowchange - rowdata -> cols

 private static void printEntry(List<CanalEntry.Entry> entries) {

     for (CanalEntry.Entry entry : entries){

         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||

                 entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){

             continue;

         }

         CanalEntry.RowChange rowChange = null;

         try {

             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

         } catch (InvalidProtocolBufferException e) {

             e.printStackTrace();

         }

         CanalEntry.EventType eventType = rowChange.getEventType();

         System.out.println(entry.getHeader().getLogfileName()+"__" +

                 entry.getHeader().getSchemaName() + "__" + eventType);

         List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

         for(CanalEntry.RowData rowData : rowDatasList){

             for(CanalEntry.Column column: rowData.getAfterColumnsList()){

                 System.out.println(column.getName() + "-" +

                         column.getValue() + "-" +

                         column.getUpdated());

             }

         }

     }

 }

}

Copy the code
  1. When you write data in mysql, the client prints the incremental data to the console.

Canal’s HA mechanism design

In the field of big data, many frameworks have HA mechanism. Canal HA is divided into two parts. Canal Server and Canal Client have corresponding HA implementations respectively:

  1. Canal Server: To reduce requests for mysql dump, only one instance on different servers must be running at a time and the others must be in standby state.
  2. Canal Client: To ensure orderliness, only one Canal Client can perform the GET, ACK, and rollback operations on one instance at a time. Otherwise, the order cannot be guaranteed.

The control of the HA mechanism mainly depends on several features of ZooKeeper, which are not covered here.

Canal Server:

  1. Whenever canal Server wants to start a Canal instance, it first initiates an EPHEMERAL node to ZooKeeper.
  2. After the ZooKeeper node is successfully created, the corresponding Canal Server starts the corresponding Canal Instance. The canal instance that is not successfully created is in standby state.
  3. Once ZooKeeper finds that the node created by the Canal Server disappears, it immediately notifies other Canal Servers to perform Step 1 again and selects another Canal Server to start instance.
  4. Each time the Canal Client connects, it first asks ZooKeeper who started the Canal Instance and then establishes a connection with it. If the connection is unavailable, it tries to connect again.
  5. The Canal Client approach is similar to the Canal Server approach in that it is controlled by preempting the EPHEMERAL node of ZooKeeper.

Canal HA configuration and real-time synchronization of data into Kafka.

  1. Modify the conf/canal.properties file
 canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181

 canal.serverMode = kafka

 canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092

Copy the code
  1. Configure the conf/example/example. The instance
Canal. The instance. The mysql. SlaveId = 790 / two canal server slaveId only

Canal.mq. topic = canal_log // Specifies the topic to send data to Kafka

Copy the code

Summary of data synchronization schemes

Having talked about the Canal tool, I now give you a brief summary of the common data acquisition tools, without involving architectural knowledge, just a brief summary to give you an impression.

Common data collection tools include DataX, Flume, Canal, Sqoop, LogStash, etc.

DataX (processing offline data)

DataX is alibaba open source of a heterogeneous data offline synchronization tool, heterogeneous data sources offline sync server-side data synchronization refers to the source to the destination end, but the end and the data source type variety, before no DataX, end and the link will form a complex network structure, fragmented cannot bring synchronization abstracting the core logic.


In order to solve the problem of heterogeneous data source synchronization, DataX transforms the complex mesh synchronization link into a star data link. DataX is responsible for connecting various data sources as an intermediate transmission carrier.

So, when you need to add a new data source, you just need to connect the data source to the DataX, and you can do seamless data synchronization with the existing data source.


DataX itself, as an offline data synchronization Framework, is built by Framework+ Plugin architecture. Abstract data source reads and writes into Reader/Writer plug-ins that are incorporated into the overall synchronization framework.

  1. Reader: A data acquisition module that collects data from the data source and sends it to the Framework.
  2. Writer: A data Writer module that continuously fetches data from the Framework and writes data to the destination.
  3. Framework: It connects Reader and Writer, acts as a data transfer channel between them, and handles buffering, concurrency, and data conversion.

The core architecture of DataX is shown below:


Core modules:

  1. The DataX completes a single data synchronization Job, which we call a Job. Upon receiving a Job, the DataX starts a process to complete the Job synchronization.
  2. After a DataX Job is started, the Job is divided into multiple tasks (subtasks) based on different sourcy-end sharding policies for concurrent execution.
  3. After multiple tasks are partitioned, the DataX Job calls the Scheduler module to assemble the tasks into taskgroups based on the amount of concurrent data configured. Each TaskGroup is responsible for executing all assigned tasks concurrently. By default, the number of concurrent tasks in a TaskGroup is 5.
  4. Each Task is started by the TaskGroup. After the Task is started, the Reader->Channel->Writer thread is started to complete the Task synchronization.
  5. After the DataX Job completes, the Job monitors and waits for multiple TaskGroup module tasks to complete and for all TaskGroup tasks to complete. Otherwise, the system exits unexpectedly.

Flume (Processing real-time data)


Flume is mainly used to synchronize log data. Flume consists of Source, Channel, and Sink.

The biggest advantage of Flume is that the official website provides abundant sources, channels, and sinks. According to different business requirements, you can find relevant configurations on the official website. Flume also provides an interface for customizing these components.

Logstash (Processing offline data)


Logstash is a real-time data transmission pipeline that transfers data from the input to the output of the pipeline. At the same time, this pipe also allows you to add filters in the middle according to your needs. Logstash provides many powerful filters to meet various application scenarios.

Logstash is written in JRuby and uses a simple message-based architecture that runs on the JVM. The data flow in the pipeline is called event, which is divided into inputs stage, filters stage and outputs stage.

Sqoop (processing offline data)


Sqoop is a tool for transferring data between Hadoop and relational databases. It is used to export data from relational databases such as MySQL to HDFS of Hadoop from Hadoop file system to relational databases. Sqoop uses MapReducer at the bottom level, so we must pay attention to data skew when using it.

conclusion

This article mainly describes the core knowledge points of Canal tool and the comparison of its data acquisition tool, among which the data acquisition tool only briefly describes the concept and application, the purpose is also to give you an impression. Old Liu dare to make sure that reading this article is basically equal to the introduction, the rest is to practice.

Canal, a tool that synchronizes mysql increment data, will try to be better than you, and let you learn by yourself.

If you have any questions, please contact our official account: hardworking Old Liu. The article has seen this, like attention to support a wave!