I. Background introduction

This article will introduce how to import MySQL data into Kafka through Binlog + Canal and then consume it by Flink.

In order to quickly verify the functionality of the entire process, all components are deployed on a single machine. If you do not have enough physical resources, you can put all the components in this document into a 4 gb 1U virtual machine environment.

If you need to deploy each component in a production environment, you are advised to replace each component with a highly available cluster deployment solution.

We created a set of Zookeeper single-node environment, which is shared by Flink, Kafka, Canal and other components.

For all components that require JRE, such as Flink, Kafka, Canal, and Zookeeper, we choose each component to use its own JRE environment, considering that updating the JRE may affect other applications.

This article is divided into two parts, of which the first seven sections focus on setting up the basic environment, and the last section describes how data flows through the various components.

The flow of data passes through the following components:

  • MySQL data source generates a Binlog.
  • Canal reads the Binlog, generates Canal JSON, and pushes it to Kafka’s Topic.
  • Flink uses the Flink-SQL-connector-Kafka API to consume data from Kafka Topics.
  • Flink writes data to TiDB via flink-connector-JDBC.

The structure of TiDB + Flink supports the development and running of many different kinds of applications.

The main features include:

  • Batch stream integration.
  • Sophisticated state management.
  • Event time support.
  • Accurate one – time state consistency assurance.

Flink can run on a variety of resource management frameworks, including YARN, Mesos, and Kubernetes. It also supports independent deployment on bare metal clusters. TiDB can be deployed on AWS, Kubernetes, GCP GKE, and can also be deployed independently on bare metal clusters using TiUP.

The common applications of TiDB + Flink are as follows:

  • Event-driven applications:

    • Fraud.
    • Exception detection.
    • Rule-based alarm.
    • Business process monitoring.
  • Data analysis application:

    • Network quality monitoring.
    • Product update and test evaluation analysis.
    • AD hoc analysis of factual data.
    • Large-scale graph analysis.
  • Data pipeline application:

    • E-commerce real-time query index construction.
    • E-commerce continues to ETL.

2. Environment introduction

2.1 Operating System Environment

[root@r20 topology]# cat /etc/redhat-release
CentOS Stream release 8
Copy the code

2.2 Software Environment

Item Version Download link
TiDB v4.0.9 Download.pingcap.org/tidb-commun…
Kafka v2.7.0 Mirrors.bfsu.edu.cn/apache/kafk…
Flink v1.12.1 Mirrors.tuna.tsinghua.edu.cn/apache/flin…
Jre V1.8.0 _281 Javadl.oracle.com/webapps/dow…
Zookeeper v3.6.2 Mirrors.tuna.tsinghua.edu.cn/apache/zook…
flink-sql-connector-kafka v1.12.1 Repo1.maven.org/maven2/org/…
flink-connector-jdbc v1.12.0 Repo1.maven.org/maven2/org/…
MySQL v8.0.23 Dev.mysql.com/get/Downloa…
Canal v1.1.4 Github.com/alibaba/can…

2.3 Machine Distribution

Hostname IP Component
r21 192.168.12.21 TiDB Cluster
r22 192.168.12.22 Kafka
r23 192.168.12.23 Flink
r24 192.168.12.24 Zookeeper
r25 192.168.12.25 MySQL
r26 192.168.12.26 Canal

3. Deploy the TiDB Cluster

Compared with traditional stand-alone databases, TiDB has the following advantages:

  • The pure distributed architecture has good scalability and supports flexible expansion and shrinkage.
  • Supports SQL, exposes the network protocol of MySQL, and is compatible with most MySQL syntax. In most scenarios, you can directly replace MySQL.
  • By default, the database supports high availability. When a few copies fail, the database can automatically recover data and fail over, which is transparent to services.
  • ACID transactions are supported and are friendly for scenarios where there is a strong need for consistency, such as bank transfers.
  • It has a rich tool chain ecosystem, covering data migration, synchronization, backup and other scenarios.

In kernel design, TiDB distributed database split the overall architecture into multiple modules, which communicate with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

In this article, we only do the simplest functional tests, so we deploy a set of single-node but duplicates of TiDB, involving the following three modules:

  • TiDB Server: THE SQL layer. It is the endpoint of the connection that exposes the MySQL protocol. It is responsible for accepting the connection from the client, performing SQL parsing and optimization, and finally generating a distributed execution plan.
  • Placement Driver (PD) Server: a meta-information management module for the entire TiDB cluster. It stores real-time data distribution of each TiKV node and the overall topology of the cluster, provides the TiDB Dashboard management and control interface, and assigns transaction ids to distributed transactions.
  • TiKV Server: Is responsible for storing data. Externally, TiKV is a distributed key-value storage engine that provides transactions.

3.1 TiUP deployment template File

# # Global variables are applied to all deployments and used as the default value of # # the deployments if a specific deployment value is missing. global: user: "tidb" ssh_port: 22 deploy_dir: "/opt/tidb-c1/" data_dir: "/opt/tidb-c1/data/" # # Monitored variables are applied to all the machines. #monitored: # node_exporter_port: 19100 # blackbox_exporter_port: 39115 # deploy_dir: "/opt/tidb-c3/monitored" # data_dir: "/opt/tidb-c3/data/monitored" # log_dir: "/opt/tidb-c3/log/monitored" # # Server configs are used to specify the runtime configuration of TiDB components. # # All configuration items can be found in TiDB docs: # # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/ # # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/ # # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/ # # All configuration items use points to represent the hierarchy, e.g: # # readpool.storage.use-unified-pool # # # # You can overwrite this configuration via the instance-level `config` field. server_configs: tidb: log.slow-threshold: 300 binlog.enable: false binlog.ignore-error: false tikv-client.copr-cache.enable: true tikv: server.grpc-concurrency: 4 raftstore.apply-pool-size: 2 raftstore.store-pool-size: 2 rocksdb.max-sub-compactions: 1 storage.block-cache.capacity: "16GB" readpool.unified.max-thread-count: 12 readpool.storage.use-unified-pool: false readpool.coprocessor.use-unified-pool: true raftdb.rate-bytes-per-sec: 0 pd: schedule.leader-schedule-limit: 4 schedule-region-schedule-limit: 2048 schedule-replica -schedule-limit: 64 PD_Servers: -host: 192.168.12.21 ssh_port: 22 name: "pd-2" client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379" data_dir: "/opt/tidb-c1/data/pd-12379" log_dir: "/opt/tidb-c1/log/pd-12379" numa_node: "0" # # The following configs are used to overwrite the `server_configs.pd` values. config: Schedule. max-merge-region-size: 20 schedule.max-merge-region-keys: 200000 tidb_Servers: -host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000" log_dir: "/opt/tidb-c1/log/tidb-14000" numa_node: "0" # # The following configs are used to overwrite the `server_configs.tidb` values. config: log.slow-query-file: Log tikv-client.copr-cache.enable: true tikv_Servers: -host: 192.168.12.21 ssh_port: 22 port: 12160 status_port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160" data_dir: "/opt/tidb-c1/data/tikv-12160" log_dir: "/opt/tidb-c1/log/tikv-12160" numa_node: "0" # # The following configs are used to overwrite the `server_configs.tikv` values. config: server.grpc-concurrency: 4 #server.labels: {zone: "zone1", dc: "dC1 ", host: "host1"} # monitoring_Servers: # -host: 192.168.12.21 # ssh_port: 22 # port: 19090 # deploy_dir: "/opt/tidb-c1/prometheus-19090" # data_dir: "/opt/tidb-c1/data/prometheus-19090" # log_dir: "/opt/tidb-c1/log/prometheus-19090" #grafana_servers: # - host: 192.168.12.21 # port: 13000 # deploy_dir: "/opt/tidb-c1/grafana-13000" # alertManager_Servers: # -host: 192.168.12.21 # ssh_port: 22 # web_port: 19093 # cluster_port: 19094 # deploy_DIR: "/opt/tidb-c1/alertmanager-19093" # data_dir: "/opt/tidb-c1/data/alertmanager-19093" # log_dir: "/opt/tidb-c1/log/alertmanager-19093"Copy the code

3.2 TiDB Cluster Environment

This article focuses on the non-deployed TiDB Cluster, which is deployed as a single replica of the TiDB Cluster on a single machine as a rapid experimental environment. There is no need to deploy a monitoring environment.

[root@r20 topology]# tiup cluster display tidb-c1-v409 Starting component `cluster`: / root/tiup/components/cluster/v1.3.2 / tiup - cluster display tidb - c1 - v409 cluster type: tidb cluster name: Tidb-c1-v409 Cluster version: v4.0.9 SSH type: Builtin Dashboard URL: http://192.168.12.21:12379/dashboard ID Role Host Ports OS/Arch Status Data Dir Deploy Dir -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 192.168.12.21:12379 pd 192.168.12.21 12379/12380 Linux/x86_64 Up | L | UI /opt/tidb-c1/data/ pD-12379 /opt/tidb-c1/ PD-12379 192.168.12.21:14000 tidb 192.168.12.21 14000/12080 Linux /x86_64 Up - /opt/tidb-c1/tidb-14000 192.168.12.21:12160 tikv 192.168.12.21 12160/12180 Linux /x86_64 Up /opt/tidb-c1/data/ tikV-12160 /opt/tidb-c1/tikv-12160 Total nodes: 4Copy the code

Create tables for testing

mysql> show create table t1; +-------+--------------------------------------------------------------------------------------------------------------- ----------------+ | Table | Create Table | +-------+--------------------------------------------------------------------------------------------------------------- ----------------+ | t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin | +-------+--------------------------------------------------------------------------------------------------------------- ----------------+ 1 row in set (0.00 SEC)Copy the code

4. Deploy the Zookeeper environment

In this experiment, Zookeeper environment is configured separately to provide services for Kafka and Flink environment.

As an experimental demonstration, only a standalone environment is deployed.

4.1 Decompressing the Zookeeper Package

[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
Copy the code

4.2 Deploying the JRE for Zookeeper

[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
Copy the code

Modify/opt/zookeeper/bin/zkEnv. Sh file, increase the JAVA_HOME environment variable

## add bellowing env var in the head of zkEnv.sh
JAVA_HOME=/opt/zookeeper/jre
Copy the code

4.3 Creating a Zookeeper Configuration File

[root@r24 conf]# cat zoo.cfg | grep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
Copy the code

4.4 start the Zookeeper

[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start
Copy the code

4.5 Checking the Zookeeper Status

## check zk status [root@r24 bin]# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/.. /conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: standalone ## check OS port status [root@r24 bin]# netstat -ntlp Active Internet connections (only servers) Proto Recv-Q Send -q Local Address Foreign Address State PID/Program name TCP 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/ SSHD tcp6 0 0 :::2181 :::* LISTEN 15062/java tcp6 0 0 :::8080 :::* LISTEN 15062/java tcp6 0 0 :::22 :::* LISTEN 942/sshd tcp6 0 0 :::44505 :::* LISTEN 15062/java ## use zkCli tool to check zk connection [root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181Copy the code

4.6 Suggestions about Zookeeper

I personally have a little premature advice for Zookeeper:

Network monitoring must be enabled for the Zookeeper cluster version. In particular, focus on Network bandwidth in System Metrics.

Deploy Kafka

Kafka is a distributed stream processing platform that can be used in two broad categories of applications:

  • Construct a real-time streaming data pipeline that reliably retrieves data between systems or applications. (Equivalent to message Queue)
  • Build real-time streaming applications that transform or influence this streaming data. Kafka Stream topic (kafka Stream topic)

Kafka has four core apis:

  • The Producer API allows an application to publish a stream of data to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them.
  • The Streams APIAllows one application as oneStream processorsConsume the input streams generated by one or more topics, and then produce an output stream to one or more topics, making efficient transitions between the input and output streams.
  • The Connector API allows you to build and run reusable producers or consumers that connect Kafka Topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.

In this experiment, only functional verification is done, and only a stand-alone Kafka environment is built.

5.1 Download and Decompress Kafka

[root@r22 soft]# tar VXZF kafka_2.13-2.7.0. TGZ [root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafkaCopy the code

5.2 Deploying the JRE for Kafka

[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
Copy the code

Modify Kafka jre environment variables

[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh
## add bellowing line in the head of kafka-run-class.sh
JAVA_HOME=/opt/kafka/jre
Copy the code

5.3 Modifying the Kafka Configuration File

Modify Kafka configuration file/opt/Kafka/config/server properties

# # change bellowing variable in/opt/kafka/config/server properties broker. Listeners id = 0 = PLAINTEXT: / / 192.168.12.22:9092 The dirs = / opt/kafka/logs to zookeeper. Connect = i192.168.12.24:2181Copy the code

5.4 start the Kafka

[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
Copy the code

5.5 Viewing Kafka Version Information

Kafka does not provide –version optional to view Kafka version information.

/ root @ r22 ~ # ll/opt/kafka/libs / | grep kafka - rw - r - r - 1 root root 4929521 Dec 16 09:02 kafka_2. 13-2.7.0. Jar -rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar. Asc -rw-r--r-- 1 root root 41793 Dec 16 09:02 Kafka_2.13-2.7.0-javadoc.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar. Asc -rw-r--r-- 1 root Root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources. Jar-rw-r --r-- 1 root root 821 Dec 16 09:03 Kafka_2. 13-2.7.0 - sources. Jar. Asc... .Copy the code

2.13 is scale version information, 2.7.0 is Kafka version information.

Deploy Flink

Apache Flink is a framework and distributed processing engine for stateful computation on both borderless and bounded data streams. Flink runs in all common cluster environments and can perform calculations at memory speed and at any scale.

Apache Flink is a framework and distributed processing engine for stateful computation of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at memory execution speed and at any size.

This experiment is only a functional test, and only a stand-alone Flink environment is deployed.

6.1 Download and distribute Flink

[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink
Copy the code

6.2 Deploying the JRE of Flink

[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
Copy the code

6.3 Adding Lib required by Flink

Flink consumes Kafka data and requires the flink-SQL-connector-kafka package.

Flink to connect to MySQL/TiDB requires the Flink-connector-JDBC package.

Jar /opt/flink/lib/ [root@r23 soft]# mv flink-sqL-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/ [root@r23 soft]# mv Flink connector - jdbc_2. 12-1.12.0. Jar/opt/flink/lib /Copy the code

6.4 Modifying the Flink Configuration File

## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml jobmanager.rpc.address: 192.168.12.23 env. Java. Home: / opt/flink/jreCopy the code

6.5 start the Flink

[root@r23 ~]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host r23.
Starting taskexecutor daemon on host r23.
Copy the code

6.6 Viewing the Flink GUI

Deploy MySQL

7.1 Decompressing MySQL Package

[root@r25 soft]# tar VXF mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz [root@r25 soft]# mv Mysql - 8.0.23 - Linux - glibc2.12 x86_64 / opt/mysql /Copy the code

7.2 Creating a MySQL Service File

[root@r25 ~]# touch /opt/mysql/support-files/mysqld.service [root@r25 support-files]# cat mysqld.service [Unit] Description=MySQL 8.0 database server After=syslog.target After=network.target [Service] Type=simple User= MySQL Group=mysql #ExecStartPre=/usr/libexec/mysql-check-socket #ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n # Note: we set --basedir to prevent probes that might trigger SELinux alarms, # per bug #547485 ExecStart=/opt/mysql/bin/mysqld_safe #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade #ExecStopPost=/opt/mysql/bin/mysql-wait-stop # Give a reasonable amount of time for the server to start up/shut down TimeoutSec=300 # Place temp files in a secure directory, not /tmp PrivateTmp=true Restart=on-failure RestartPreventExitStatus=1 # Sets open_files_limit LimitNOFILE = 10000 # Set  enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command. Environment=MYSQLD_PARENT_PID=1 [Install] WantedBy=multi-user.target ## copy mysqld.service to /usr/lib/systemd/system/ [root@r25 support-files]# cp mysqld.service /usr/lib/systemd/system/Copy the code

7.3 Creating the my.cnf File

[root@r34 opt]# cat /etc/my.cnf
[mysqld]
port=3306
basedir=/opt/mysql
datadir=/opt/mysql/data
socket=/opt/mysql/data/mysql.socket
max_connections = 100
default-storage-engine = InnoDB
character-set-server=utf8
log-error = /opt/mysql/log/error.log
slow_query_log = 1
long-query-time = 30
slow_query_log_file = /opt/mysql/log/show.log
min_examined_row_limit = 1000
log-slow-slave-statements
log-queries-not-using-indexes
#skip-grant-tables
Copy the code

7.4 Initializing and Starting MySQL

[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console [root@r25 ~]# chown -R mysql:mysql /opt/mysql [root@r25 ~]# systemctl start mysqld ## check mysql temp passord from /opt/mysql/log/error.log 2021-02-24T02:45:47.316406z 6 [Note] [my-010454] [Server] A temporary password is generated for root@localhost: I? nDjijxa3>-Copy the code

7.5 Creating a new MySQL user to connect to Canal

## change mysql temp password firstly
mysql> alter user 'root'@'localhost' identified by 'mysql';
Query OK, 0 rows affected (0.00 sec)
## create a management user 'root'@The '%'
mysql> create user 'root'@The '%' identified by 'mysql';
Query OK, 0 rows affected (0.01 sec)
mysql> grant all privileges on *.* to 'root'@The '%';
Query OK, 0 rows affected (0.00 sec)
## create a canal replication user 'canal'@The '%'
mysql> create user 'canal'@The '%' identified by 'canal';
Query OK, 0 rows affected (0.01 sec)
mysql> grant select, replication slave, replication client on *.* to 'canal'@The '%';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
Copy the code

7.6 Creating a table for testing in MySQL

mysql> show create table test.t2;
+-------+----------------------------------------------------------------------------------+
| Table | Create Table                                                                     |
+-------+----------------------------------------------------------------------------------+
| t2    | CREATE TABLE `t2` (
  `id` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+----------------------------------------------------------------------------------+
1 row in set (0.00 sec)
Copy the code

Deploy Canal

Canal’s main purpose is to provide incremental data subscription and consumption based on incremental log parsing of MySQL database.

In the early stage, Because of the deployment of double machine rooms in Hangzhou and the United States, Alibaba had the business requirement of cross-machine room synchronization, and the realization method was mainly based on business trigger to obtain incremental changes.

Since 2010, businesses have gradually tried database log parsing to obtain incremental changes for synchronization, resulting in a large number of database incremental subscription and consumption services.

Log-based incremental subscription and consumption services include:

  • Database mirroring.

  • Real-time database backup.

  • Index building and real-time maintenance (split heterogeneous index, inverted index, etc.).

  • The service cache is refreshed. Procedure

  • Incremental data processing with business logic.

Current canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x.

8.1 Decompressing the Canal Package

[root@r26 soft]# mkdir /opt/canal && tar VXZF canal. Deployer-1.1.4.tar.gz -c /opt/canalCopy the code

8.2 Deploying the JRE for Canal

[root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh 
JAVA=/opt/canal/jre/bin/java
Copy the code

8.3 Modifying the Configuration file of Canal

Modify/opt/canal/conf/canal. The properties configuration file

## modify bellowing configuration
canal.zkServers =192.168.12.24:2181
canal.serverMode = kafka
canal.destinations = example        ## 需要在 /opt/canal/conf 目录下创建一个 example 文件夹,用于存放 destination 的配置
canal.mq.servers = 192.168.12.22:9092
Copy the code

Modify/opt/canal/conf/example/instance properties configuration file

# # the modify bellowing configuration canal. Instance. Master. Address = 192.168.12.25:3306 canal. The instance. The dbUsername = canal canal.instance.dbPassword=canal canal.instance.filter.regex=.*\\.. Canal.mq. Topic =canal-kafkaCopy the code

Configure the data flow direction

MySQL -> Canal -> Kafka

9.1.1 Viewing MySQL Binlog Information

Check the MySQL Binlog information to ensure that the Binlog is normal.

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog000001. |     2888 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
Copy the code

9.1.2 Creating a Topic in Kafka

Create a Topic canal-kafka in Kafka, The name of this Topic with Canal configuration file/opt/Canal/conf/example/instance. The properties of the Canal. The mq. Topic = Canal – kafka corresponds to:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics. Sh --create \ > --zookeeper 192.168.12.24:2181 \ > --config max.message.bytes=12800000 \ > --config flush.messages=1 \ > --replication-factor 1 \ > --partitions 1 \ > --topic Canal-kafka Created topic canal-kafka. [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed Fetcher for partitions the Set (canal - kafka - 0) (kafka. Server. ReplicaFetcherManager) [the 01:51:55 2021-02-24, 052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2021-02-24 01:51:55.053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, Flush. Messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, Message. The timestamp. The type - > CreateTime, preallocate - > false, min. The cleanable. Dirty. Thewire - > 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, Segment. Index. Bytes -> 10485760}. (kafka.log.logManager) [2021-02-24 01:51:55.053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition) [2021-02-24 01:51:55.053] INFO [Partition canal-kafka-0 broker=0] Log loaded for Partition canal-kafka-0 with initial high watermark  0 (kafka.cluster.Partition)Copy the code

View all topics in Kafka:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics. Sh --list --zookeeper 192.168.12.24:2181 __consumer_offsets clock-kafka ticdc-testCopy the code

Kafka Topic ticDC-test

[root@r22 ~]# /opt/kafka/bin/kafka-topics. Sh --describe --zookeeper 192.168.12.24.2181 --topic clock-kafka topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1 Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0Copy the code

9.1.3 start Canal

Before starting Canal, check the status of the port on the Canal node:

3306 the port # # # # check MySQL canal. The instance. The master. The address = 192.168.12.25:3306 / root @ r26 bin # Telnet 192.168.12.25 3306 Servers = 192.168.12.22:9092 [root@r26 bin]# Telnet 192.168.12.229092 ## check Kafka 9092 port ## ccanal ZkServers = 192.168.12.24:2181 [root@r26 bin]# Telnet 192.168.12.242181Copy the code

Start the Canal:

[root@r26 bin]# /opt/canal/bin/startup.sh cd to /opt/canal/bin for workaround relative path LOG CONFIGURATION : /opt/canal/bin/.. /conf/logback.xml canal conf : /opt/canal/bin/.. /conf/canal.properties CLASSPATH :/opt/canal/bin/.. /conf:/opt/canal/bin/.. / lib/zookeeper - 3.4.5. Jar: / opt/canal/bin /.. / lib/zkclient - 0.10. The jar: / opt/canal/bin /.. / lib/spring - tx - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring - the orm - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring - JDBC - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring - expression - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring - core - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring - the context - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring - beans - 3.2.18. The jar: / opt/canal/bin /.. / lib/spring aop -- 3.2.18. The jar: / opt/canal/bin /.. . / lib/snappy - Java - 1.1.7.1 jar: / opt/canal/bin /.. / lib/snakeyaml - 1.19. The jar: / opt/canal/bin /.. . / lib/slf4j - API - 1.7.12 jar: / opt/canal/bin /.. / lib/simpleclient_pushgateway - 0.4.0. Jar: / opt/canal/bin /.. / lib/simpleclient_httpserver - 0.4.0. Jar: / opt/canal/bin /.. / lib/simpleclient_hotspot - 0.4.0. Jar: / opt/canal/bin /.. / lib/simpleclient_common - 0.4.0. Jar: / opt/canal/bin /.. / lib/simpleclient - 0.4.0. Jar: / opt/canal/bin /.. . / lib/scala - reflect - 2.11.12 jar: / opt/canal/bin /.. / lib/scala - logging_2. 11-3.8.0. Jar: / opt/canal/bin /.. . / lib/scala - library - 2.11.12 jar: / opt/canal/bin /.. / lib/rocketmq - srvutil - 4.5.2. Jar: / opt/canal/bin /.. / lib/rocketmq - remoting - 4.5.2. Jar: / opt/canal/bin /.. / lib/rocketmq - logging - 4.5.2. Jar: / opt/canal/bin /.. / lib/rocketmq - common - 4.5.2. Jar: / opt/canal/bin /.. / lib/rocketmq - the client - 4.5.2. Jar: / opt/canal/bin /.. / lib/rocketmq - acl - 4.5.2. Jar: / opt/canal/bin /.. / lib/protobuf - Java - 3.6.1. Jar: / opt/canal/bin /.. / lib/oro - mid-atlantic moved. The jar: / opt/canal/bin /.. / lib/netty - tcnative - boringssl - static - 1.1.33. Fork26. Jar: / opt/canal/bin /.. / lib/netty - all - 4.1.6. Final. Jar: / opt/canal/bin /.. / lib/netty - 3.2.2. Final. Jar: / opt/canal/bin /.. / lib/mysql connector - Java - 5.1.47. Jar: / opt/canal/bin /.. / lib/metrics - core - 2.2.0. Jar: / opt/canal/bin /.. / lib/lz4 -- Java 1.4.1. Jar: / opt/canal/bin /.. / lib/logback - core - 1.1.3. Jar: / opt/canal/bin /.. - 1.1.3. / lib/logback - classic jar: / opt/canal/bin /.. / lib/kafka - clients - 1.1.1. Jar: / opt/canal/bin /.. / lib/kafka_2. 11-1.1.1. Jar: / opt/canal/bin /.. / lib/jsr305-3.0.2. Jar: / opt/canal/bin /.. . / lib/jopt - simple - 5.0.4 jar: / opt/canal/bin /.. / lib/jctools - core - 2.1.2. Jar: / opt/canal/bin /.. / lib/JCL - over - slf4j - 1.7.12. Jar: / opt/canal/bin /.. / lib/javax.mail. The annotation - API - 1.3.2. Jar: / opt/canal/bin /.. / lib/Jackson - databind - 2.9.6. Jar: / opt/canal/bin /.. / lib/Jackson - core - 2.9.6. Jar: / opt/canal/bin /.. . / lib/Jackson - annotations - 2.9.0 jar: / opt/canal/bin /.. . / lib/ibatis sqlmap - 2.3.4.726 jar: / opt/canal/bin /.. / lib/httpcore - 4.4.3. Jar: / opt/canal/bin /.. / lib/httpclient - 4.5.1. Jar: / opt/canal/bin /.. / lib/h2-1.4.196. Jar: / opt/canal/bin /.. / lib/guava - 18.0. The jar: / opt/canal/bin /.. / lib/fastsql - 2.0.0 _preview_973. Jar: / opt/canal/bin /.. / lib/fastjson - 1.2.58. Jar: / opt/canal/bin /.. / lib/druid - 1.1.9. Jar: / opt/canal/bin /.. / lib/disruptor - 3.4.2. Jar: / opt/canal/bin /.. / lib/Commons logging - 1.1.3. Jar: / opt/canal/bin /.. / lib/Commons - lang3-3.4. The jar: / opt/canal/bin /.. / lib/Commons - lang - 2.6. The jar: / opt/canal/bin /.. / lib/Commons - IO - 2.4. The jar: / opt/canal/bin /.. / lib/Commons - compress - 1.9. The jar: / opt/canal/bin /.. / lib/Commons - codec - 1.9. The jar: / opt/canal/bin /.. / lib/Commons cli - 1.2. The jar: / opt/canal/bin /.. . / lib/Commons beanutils - 1.8.2 jar: / opt/canal/bin /.. / lib/canal. Store - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Sink - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Server - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Protocol - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Prometheus - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Parse. Driver - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Parse. Dbsync - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Parse - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Meta - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. The instance. The spring - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. The instance. The manager - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. The instance. The core - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Filter - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal. Deployer - 1.1.4. Jar: / opt/canal/bin /.. / lib/canal.com mon - 1.1.4. Jar: / opt/canal/bin /.. / lib/aviator - 2.2.1. Jar: / opt/canal/bin /.. Jar: CD to /opt/canal/bin for continueCopy the code

9.1.4 Viewing Canal Logs

View/opt/canal/logs/example/example. The log

[destination = example, address = /192.168.12.25:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, It will be long time for reset or first position 2021-02-24 01:41:40.293 [destination = example, Address = / 192.168.12.25:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show Master status 2021-02-24 01:41:40.542 [destination = example, address = /192.168.12.25:3306, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost :  244ms , the next step is binlog dumpCopy the code

9.1.5 Check the consumer information in Kafka

Insert a test message into MySQL:

mysql> insert into t2 values(1);
Query OK, 1 row affected (0.00 sec)
Copy the code

Look at consumer’s information and you have the test data you just inserted:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22.9092 --topic ccanal --from-beginning {"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql": "create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"} {"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":" create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"} {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNa mes":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}Copy the code

9.2 Kafka -> Flink path

Create a t2 table in Flink with connector type kafka.

# #create a test table t2 in Flink
Flink SQL> create table t2(id int)
> WITH (
>  'connector' = 'kafka'.>  'topic' = 'canal-kafka'.>  'properties.bootstrap.servers' = '192.168.12.22:9092'.>  'properties.group.id' = 'canal-kafka-consumer-group'.>  'format' = 'canal-json'.>  'scan.startup.mode' = 'latest-offset'
> );
Flink SQL> select * from t1;
Copy the code

Insert test data into MySQL;

mysql> insert into test.t2 values(2);
Query OK, 1 row affected (0.00 sec)
Copy the code

Data can be synchronized in real time from Flink:

Flink SQL> select * from t1;
 Refresh: 1 s                                                                                                             Page: Last of 1                                                                                                     Updated: 02:49:27.366
                        id
                         2
Copy the code

9.3 Flink -> TiDB pathway

9.3.1 Create tables for testing in the downstream TiDB

[root@r20 soft]# mysql -uroot -P14000 -hr21
mysql> create table t3 (id int);
Query OK, 0 rows affected (0.31 sec)
Copy the code

9.3.2 Creating a test table in Flink

Flink SQL> CREATE TABLE t3 (
>     id int
> ) with (
>     'connector' = 'jdbc'.>     'url' = 'the JDBC: mysql: / / 192.168.12.21:14000 / test'.>     'table-name' = 't3'.>     'username' = 'root'.>     'password' = 'mysql'
> );
Flink SQL> insert into t3 values(3);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: a0827487030db177ee7e5c8575ef714e
Copy the code

9.3.3 Viewing inserted Data in the Downstream TiDB

mysql> select * from test.t3;
+------+
| id   |
+------+
|    3 |
+------+
1 row in set (0.00 sec)
Copy the code