preface

If Canal is used to monitor the status of MySQL, when Canal is a single-node service, the server will hang up, resulting in data loss. At this time, Canal can configure HA precisely, so as to solve the single point of problem. But depending on ZooKeeper, let’s configure Canal’s HA.

1. Canal HA mode configuration

1.1 Configuring server HA Mode

Canal supports HA, and its implementation mechanism relies on ZooKeeper, using features such as watcher and EPHEMERAL nodes (bound to the session lifecycle), similar to HDFS HA.

Canal HA is divided into two parts. Canal Server and Canal Client have corresponding HA implementations respectively

  • Canal Server: To reduce mysql dump requests,differentInstance on a server (the same instance on different servers) requires that only one instance is running at a time and the others are in standby state (standby is the instance state).
  • canal client: To ensure orderliness, only one Canal client can perform the GET, ACK, and rollback operations on one instance at a time. Otherwise, the client cannot ensure orderliness.

1.2 Environment Preparations

  • Canal: node01 and node02
  • Zookeeper: node01 and node02 node03
  • MySQL: node01

1.3 Canal HA Server Configuration

According to the deployment and configuration, complete the configuration on each machine. During the demonstration, change the instance name to Example to modify canal.properties and add the ZooKeeper configuration

canal.zkServers=node01:2181,node02:2181,node03:2181# close this file. Open the default this file This file configuration the zookeeper address # canal. The instance. The global. Spring. XML = classpath: spring/file - the instance. The XML canal.instance.global.spring.xml = classpath:spring/default-instance.xmlCopy the code
# directory location canal/conf/example/instance properties canal. The instance. The mysql. SlaveId = 1235Copy the code

Note: need to copy to node02 need to modify the Canal. The Canal bags instance. Mysql. SlaveId this need the number on the machine with node01, to be responsible for problems

1.4 The Canal environment is started

  1. Start the zookeeper
Nohup/export/servers/zookeeper - 3.4.5 - cdh5.14.0 / bin/zkServer. Sh startCopy the code
  1. Start Canal on Node01
./startup.bat
Copy the code
  1. Start Canal on Node02
./startup.bat
Copy the code

-------
ssh node1
sh bin/startup.sh
--------
ssh node2
sh bin/startup.sh
Copy the code

After the startup, you can view logs/example/example.log and only one machine is logged with successful startup. For example, node02 is successfully started here

The 2020-10-17 03:13:24. 211. [the main] INFO C.A.O.C.I.S pring. Support. Accomplished - Loading the properties file From class Path resource [canal.properties] 2020-10-17 03:13:24.224 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance. The properties] 03:13:24. 2020-10-17, 340 [main] WARN org. Springframework. Beans. TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory! 03:13:24 2020-10-17. 486. [the main] INFO C.A.O tter. Canal. Instance. Spring. CanalInstanceWithSpring - start CannalInstance for 1-2020-10-17 example 03:13:24. 509. [the main] INFO C.A.O tter. Canal. The instance. The core. AbstractCanalInstance - start successful...  [destination = 2020-10-17 03:13:24. 604 example, address = node02/192.168.100.202:3306, EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master  statusCopy the code

View the node information in ZooKeeper. The current node is node01:11111

[zk: localhost:2181(CONNECTED) 2]  get /otter/canal/destinations/example/running
{"active":true."address":"192.168.100.201:11111"."cid":1}
cZxid = 0x6800000013
ctime = Mon Oct 12 05:13:29 CST 2020
mZxid = 0x6800000013
mtime = Mon Oct 12 05:13:29 CST 2020
pZxid = 0x6800000013
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2751980dfb80000
dataLength = 57
numChildren = 0
Copy the code

2. Client connection

package com.canal.Test;

/ * * *@authorBig data guy *@version V1.0
 * @Package com.canal.Test
 * @FileJava: CanalTest. * *@date2021/1/11 21:54 * /

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;

/** * Test canal configuration successfully */
public class CanaHAlTest {

    public static void main(String[] args) {
        //1. Create a connection

        CanalConnector connect = CanalConnectors.newClusterConnector("node01:2181,node02:2181,node03:2181"."example".""."");
        // Specifies the number of entries to read at a time
        int bachChSize = 1000;
        // Set the state
        boolean running = true;
        while (running) {
            //2. Establish a connection
            connect.connect();
            // Roll back the last requested information placement to prevent data loss
            connect.rollback();
            // Subscribe to matching logs
            connect.subscribe();
            while (running) {
                Message message = connect.getWithoutAck(bachChSize);
                / / get batchId
                long batchId = message.getId();
                // Get the number of binlog entries
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {}else {
                    printSummary(message);
                }
                // Verify that the specified batchId has been consumed successfullyconnect.ack(batchId); }}}private static void printSummary(Message message) {
        // Iterate over each binlog entity in the batch
        for (CanalEntry.Entry entry : message.getEntries()) {
            // Transaction starts
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // Get the binlog file name
            String logfileName = entry.getHeader().getLogfileName();
            // Get the offset of logfile
            long logfileOffset = entry.getHeader().getLogfileOffset();
            // Get the timestamp of the SQL statement execution
            long executeTime = entry.getHeader().getExecuteTime();
            // Get the database name
            String schemaName = entry.getHeader().getSchemaName();
            // Get the table name
            String tableName = entry.getHeader().getTableName();
            // Get event type insert/update/delete
            String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();

            System.out.println("logfileName" + ":" + logfileName);
            System.out.println("logfileOffset" + ":" + logfileOffset);
            System.out.println("executeTime" + ":" + executeTime);
            System.out.println("schemaName" + ":" + schemaName);
            System.out.println("tableName" + ":" + tableName);
            System.out.println("eventTypeName" + ":" + eventTypeName);

            CanalEntry.RowChange rowChange = null;
            try {
                // Get the stored data and parse the binary byte data into RowChange entities
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }


            // Iterate over each change
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                // Determine whether it is a deletion event
                if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) {
                    System.out.println("---delete---");
                    printColumnList(rowData.getBeforeColumnsList());
                    System.out.println("-");
                }
                // Determine if it is an update event
                else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) {
                    System.out.println("---update---");
                    printColumnList(rowData.getBeforeColumnsList());
                    System.out.println("-");
                    printColumnList(rowData.getAfterColumnsList());
                }
                // Determine whether it is an insert event
                else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) {
                    System.out.println("---insert---");
                    printColumnList(rowData.getAfterColumnsList());
                    System.out.println("-"); }}}}// Prints all column names and column values
    private static void printColumnList(List<CanalEntry.Column> columnList) {
        for (CanalEntry.Column column : columnList) {
            System.out.println(column.getName() + "\t"+ column.getValue()); }}}Copy the code

Run the test

[main - SendThread 14:24:50. 371 (2181) node02:] the DEBUG org. Apache. Zookeeper. ClientCnxn - Got ping response for sessionid: 0 x2751980dfb80001 after 1 ms 14:25:03. 704 [main - SendThread (node02:2181)] the DEBUG org. Apache. Zookeeper. ClientCnxn - Got ping  response for sessionid: 0x2751980dfb80001 after 1msCopy the code

Go to the database to modify a data test

logfileName:mysql-bin.000002 logfileOffset:761 executeTime:1602452082000 schemaName:zw tableName:dw_t_product EventTypeName :update --update-- goods_ID 007 goods_status CreateTime 2019-12-22 modifyTime 2019-12-22 CDat 20191222 -- goods_ID 007 goods_status CreateTime 33 modifyTime 2019-12-22 CDat 20191222Copy the code

At this time, we successfully obtained the data we modified. At this time, some friends said it was not HA. Stop node01 and see if the task runs properly.

[root@node01 bin]# ./stop.sh 
node01: stopping canal 6345 ... 
Oook! cost:1
Copy the code

Modify a piece of data in the database to see if you can get it

logfileName:mysql-bin.000002 logfileOffset:1071 executeTime:1602452401000 schemaName:zw tableName:dw_t_product EventTypeName :update --update-- goods_ID 004 goods_status CreateTime 2019-12-15 modifyTime 2019-12-20 cdat 20191222 -- goods_ID 004 goods_status deleted CreateTime 2019-2-15 modifyTime 2019-12-20 cdat 20191222Copy the code

It is also found that data can be obtained. Now we go to ZooKeeper to see which node Canal provides external services to

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.100.202:11111"," CID ":1} cZxid = 0x680000002A ctime = Mon Oct 12 05:39:05 CST 2020 mZxid = 0x680000002a mtime = Mon Oct 12 05:39:05 CST 2020 pZxid = 0x680000002a cversion = 0 dataVersion = 0 aclVersion =  0 ephemeralOwner = 0x17532d2b90c0000 dataLength = 57 numChildren = 0Copy the code

You can clearly see that we have successfully switched to our node02 node

3. Flow chart of Canal Server HA

  1. When canal Server wants to start a canal instance, it first makes an EPHEMERAL attempt to ZooKeeper.
  2. After the ZooKeeper node is successfully created, the corresponding Canal Server starts the corresponding Canal Instance. The canal instance that is not successfully created is in standby state
  3. If the node created by Canal Server A disappears, ZooKeeper immediately notifies the other Canal Servers to perform Step 1 again and select A Canal Server to start instance.
  4. Each time the Canal Client connects, it first asks ZooKeeper who started canal Instance and then establishes a link with it. If the link is unavailable, it tries to connect again.

summary

At this point we have solved Canal’s single point problem. Now most of the components will create HA. First of all, data is the ultimate need for the company. I am here to provide you with big data data need friends can go to GitHub below to download, believe in yourself, efforts and sweat will always be rewarded. I’m big data, and I’ll see you next time

Flink interview questions, Spark Interview questions, Essential Software for Programmers, Hive interview questions, Hadoop interview questions, Docker interview questions, resume templates and other resources please go to GitHub to download github.com/lhh2002/Fra… Gitee download gitee.com/li_hey_hey/… GitHub: github.com/lhh2002/Rea…