In the actual project, it is necessary to build a cluster with canal-admin, send the collected data to Kafka, and the Springboot client consumes the data from Kafka. The canal-spring-boot-starter introduced here supports consumption from Kafka
Database Account Creation
Start by creating an account for Canal in the monitored database
create user canal identified by 'canal';
grant select.show view, replication slave, replication client on *.* to 'canal'@The '%';
flush privileges;
Copy the code
docker-compose.yml
version: '3.1'
services:
canal-server:
restart: always
container_name: canal-server
image: canal/canal-server:latest
ports:
- 11111: 11111
volumes:
- ./server-logs:/home/admin/canal-server/logs
Define an instance called test
- ./instance/test:/home/admin/canal-server/conf/test
Copy the code
/instance/test/ create instance.properties file with the following contents:
canal.instance.master.address=192.168.3.3:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=Library name to listen on \\.. *
canal.instance.parser.parallelThreadSize=Number of CPU cores
Copy the code
canal.instance.master.address
The address of the mysql database to listen oncanal.instance.dbUsername=canal
The username used by the mysql database to Canalcanal.instance.dbPassword=canal
The mysql database gave Canal the password to usecanal.instance.filter.regex
Set the database to listen on, for exampletest\\.. *
Indicates to listen for changes to all tables in the test library- `
Canal. The instance. The parser. ParallelThreadSize ` canal takes up the number of CPU core, set the instance test use can be set to 1
There are many other configurations, such as setting the position of the binlog, which can be found in the official Canal documentation
Springboot configuration
canal:
mode: simple
filter: Filter table name, which can be null
batch-size: 1
timeout: 1
server: 192.1683.3.: 11111
destination: The instance name defined in canal-server cannot be empty
user-name: canal
password: canal
async: true If set to false, the MessageHandler bean cannot be found
Copy the code
Springboot consumption
Canal-spring-boot-starter is introduced in POM. XML
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1 - RELEASE</version>
</dependency>
Copy the code
This package was previously mentioned in the canal-spring-boot-starter modification article that you need to pull your own source code to make it usable
If you have a test table in your database and the corresponding entity class is Test, you can listen for changes to the test table by defining a consumer as follows
@Slf4j
@Component
@canalTable (value = "table name ")
public class TestConsumer implements EntryHandler<Test> {
@Override
public void insert(Test test) {
log.info("Add, {}", test);
}
/ * * *@paramBefore contains only the changed attribute *@paramAfter contains all attributes */
@Override
public void update(Test before, Test after) {
log.info("Update, before ={}, after ={}", before, after);
}
@Override
public void delete(Test test) {
log.info("Delete, {}", test); }}Copy the code
The policy pattern is recommended to handle the processing logic of different fields in different tables
Unified handling
All monitored tables are processed by putting table fields and values in a Map. The field name is the key of the Map, and the field value is the value of the Map
@Slf4j
@Component
@CanalTable(value = "all")
public class AllConsumer implements EntryHandler<Map<String.String>> {
@Override
public void insert(Map<String, String> map) {
log.info("Add, {}", map);
}
@Override
public void update(Map<String, String> before, Map<String, String> after) {
// CanalModel can get the current database name and table name
CanalModel canal = CanalContext.getModel();
log.info("Update, before ={}, after ={}", before, after);
}
@Override
public void delete(Map<String, String> map) {
log.info("Delete, {}", map); }}Copy the code
Public number: fly code