In the actual project, it is necessary to build a cluster with canal-admin, send the collected data to Kafka, and the Springboot client consumes the data from Kafka. The canal-spring-boot-starter introduced here supports consumption from Kafka

Database Account Creation

Start by creating an account for Canal in the monitored database

create user canal identified by 'canal';
grant select.show view, replication slave, replication client on *.* to 'canal'@The '%';
flush privileges;
Copy the code

docker-compose.yml

version: '3.1'
services:
  canal-server:
    restart: always
    container_name: canal-server
    image: canal/canal-server:latest
    ports:
      - 11111: 11111
    volumes:
      - ./server-logs:/home/admin/canal-server/logs
      Define an instance called test
      - ./instance/test:/home/admin/canal-server/conf/test
Copy the code

/instance/test/ create instance.properties file with the following contents:

canal.instance.master.address=192.168.3.3:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=Library name to listen on \\.. *
canal.instance.parser.parallelThreadSize=Number of CPU cores
Copy the code
  • canal.instance.master.addressThe address of the mysql database to listen on
  • canal.instance.dbUsername=canalThe username used by the mysql database to Canal
  • canal.instance.dbPassword=canalThe mysql database gave Canal the password to use
  • canal.instance.filter.regexSet the database to listen on, for exampletest\\.. *Indicates to listen for changes to all tables in the test library
  • `

Canal. The instance. The parser. ParallelThreadSize ` canal takes up the number of CPU core, set the instance test use can be set to 1

There are many other configurations, such as setting the position of the binlog, which can be found in the official Canal documentation

Springboot configuration

canal:
  mode: simple
  filter: Filter table name, which can be null
  batch-size: 1
  timeout: 1
  server: 192.1683.3.: 11111
  destination: The instance name defined in canal-server cannot be empty
  user-name: canal
  password: canal
  async: true If set to false, the MessageHandler bean cannot be found
Copy the code

Springboot consumption

Canal-spring-boot-starter is introduced in POM. XML

<dependency>
  <groupId>top.javatool</groupId>
  <artifactId>canal-spring-boot-starter</artifactId>
  <version>1.2.1 - RELEASE</version>
</dependency>
Copy the code

This package was previously mentioned in the canal-spring-boot-starter modification article that you need to pull your own source code to make it usable

If you have a test table in your database and the corresponding entity class is Test, you can listen for changes to the test table by defining a consumer as follows

@Slf4j
@Component
@canalTable (value = "table name ")
public class TestConsumer implements EntryHandler<Test> {
    @Override
    public void insert(Test test) {
        log.info("Add, {}", test);
    }

    / * * *@paramBefore contains only the changed attribute *@paramAfter contains all attributes */
    @Override
    public void update(Test before, Test after) {
        log.info("Update, before ={}, after ={}", before, after);
    }

    @Override
    public void delete(Test test) {
        log.info("Delete, {}", test); }}Copy the code

The policy pattern is recommended to handle the processing logic of different fields in different tables

Unified handling

All monitored tables are processed by putting table fields and values in a Map. The field name is the key of the Map, and the field value is the value of the Map

@Slf4j
@Component
@CanalTable(value = "all")
public class AllConsumer implements EntryHandler<Map<String.String>> {
    @Override
    public void insert(Map<String, String> map) {
        log.info("Add, {}", map);
    }

    @Override
    public void update(Map<String, String> before, Map<String, String> after) {
        // CanalModel can get the current database name and table name
        CanalModel canal = CanalContext.getModel();
        log.info("Update, before ={}, after ={}", before, after);
    }

    @Override
    public void delete(Map<String, String> map) {
        log.info("Delete, {}", map); }}Copy the code

Public number: fly code