introduce
I used MyBatis Intercept to synchronize ES data before, but there are two problems: one is bad data synchronization across libraries, and the other is that the data modified by manual operation of the database cannot be obtained. So Maxwell+kafka is used to synchronize MySQL binlog and write data to ES by parsing the binlog.
The environment
- Kafka_2. 12 – server. TGZ
- Maxwell – 1.23.5. Tar. Gz
- Elasticsearch – 7.7.1 – Linux – x86_64. Tar. Gz
- 7.26 MySQL_5.
- Linux version 4.15.6-1.el7.elrepo.x86_64 – Red Hat 4.8.5-16
MySQL
Binlog open
#Modify my.cnf and add the following configuration.
[root@Server1d220 mysql]$ sudo vim /etc/my.cnf
#Specify a random string whose name cannot be the same as the name of another machine in the cluster. If there is only one machine, you can specify it arbitrarily
server_id=1
log-bin=master
#Select Row mode
binlog_format=row
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
#Restarting the Database
systemctl restart mysqld.service
#Start MySql set parameters
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
Copy the code
Other commands
Check whether binlog is enabledshow global variables like '%log_bin%'
Copy the code
# to check the binlogshowbinlog events; Check the name of the latest binlog fileshowmaster status; # find binlog/ -name mysql-bin -Type f # Displays detailed log configuration informationSHOW GLOBAL VARIABLES LIKE '%log%'; Mysql data store directoryshow variables like '%dir%'; View the contents of binlogshow binlog events in 'mysql-bin.000003';
Copy the code
Kafka
Kafka installation
- Kafka’s official website
- There is no need to download ZooKeeper separately because Kafka comes with the distribution
- Here is the standalone installation
#Download the KafkaRoot @ Server1d220 wget HTTP: / / https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz
#Unpack the installation
[root@Server1d220]#The tar - ZXVF kafka_2. 12 - server. TGZ
Copy the code
Modify Kafka and ZooKeeper configurations
#Modify the Kafka configuration[root @ Server1d220 kafka_2. 12 - server] # vim config/server properties#Change the Intranet IP addressListeners = PLAINTEXT: / / 192.168.1.220:9092#Return address of an extranet client accessAdvertised. Listeners = PLAINTEXT: / / 192.168.1.220:9092#Changing the Log LocationThe dirs = / home/lib/kafka_2. 12 - server/data#Set the ZooKeeper address
zookeeper.connect=localhost:2181
#Set the message expiration time
log.retention.hours=24
#Setting a Clearing Policy
log.cleanup.policy=delete
#Delete old data when it exceeds 10 GBThe retention. Bytes = 10737418240 [root @ Server1d220 kafka_2. 12 - server] # vim config/consumer. Properties#Setting the Cluster AddressThe bootstrap. The servers = 192.168.1.220:9092#Set the group
group.id=dev-consumer-group
#Example Modify Zookeeper configurations[root @ Server1d220 kafka_2. 12 - server] # vim config/zookeeper propertie#Modifying a File DirectoryDataDir = / home/lib/kafka_2. 12 - server/zookeeper/data#Modify the port
clientPort=2181
Copy the code
Start Kafka and Zookeeper
#Zookeeper is started in the background[root @ Server1d220 kafka_2. 12 - server] # nohup. / bin/zookeeper - server - start. Sh. / config/zookeeper. Properties >logs/zookeeper.log 2>&1 &
#Start Kafka in the background[root @ Server1d220 kafka_2. 12 - server] # nohup. / bin/kafka - server - start. Sh. / config/server properties > logs/kafka. Log > & 1 & 2
#Create a Kafka Topic with the name Maxwell
[root@Server1d220 kafka_2.12-2.6.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.1.220:2181 --replication-factor 1 --partitions 1 --topic maxwell
Copy the code
Unified startup script
#! /bin/sh
#Start the zookeeperNohup. / bin/zookeeper - server - start. Sh. / config/zookeeper. Properties > / dev/null 2 > &1 & sleep 3 # 3 seconds after the execution, etc
#Start the kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
Copy the code
The cluster configuration
Note the following about the cluster configuration:
- Modify the zookeeper. The properties
#Configure the following parameters[root @ Server1d238 kafka_2. 12 - server] vim config/zookeeper properties dataDir = / usr/local/kafka my_dir/zookeeper/data InitLimit =10 syncLimit=5 maxClientCnxns=0 server.1=192.168.1.238:2888:3888 server.2=192.168.110.70:2888:3888 Server. 3 = 192.168.110.71:2888-3888
#Create the myID file in the dataDir directory
#The contents of myID are 1, 2, and 3 after server238 host: echo 1 > myID 70 Host: echo 2 > myID 71 Host: echo 3 > myIDCopy the code
- Modify server properties
[root @ Server1d238 kafka_2. 12 - server] vim config/server properties#The following configuration needs to be modified in the cluster
#The only sign
broker.id=0
#Listen on the address, change to the local IP portListeners = PLAINTEXT: / / 192.168.1.238:9092 advertised. Listeners = PLAINTEXT: / / 192.168.1.238:9092#Change the IP address of the ZooKeeper clusterZookeeper. Connect = 192.168.1.238:2181192168 110.70:2181192168 110.71:2181Copy the code
Other commands
#Query the list of service topics[root@Server1d220 kafka_2.12-2.6.0]#./bin/kafka-topics
#Delete the topic
#Delete.topic.enable = needs to be set in server.propertiestrue
[10080@Server1d220 kafka_2.12-2.6.0]$ ./bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic contradictionmediation__member_info
#Query topic details[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-topics. Sh -- ZooKeeper 127.0.0.1:2181 --topic contradictionmediation__member_info --describe
#Query consumer messages[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic contradictionmediation__member_info --from-beginning
#Query the consumer list under the Kafka node[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
#Delete a grop[10080@Server1d220 kafka_2.12-2.6.0]$./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group tablePartiesInfoConsumer
#Kafka stop command[root @ Server1d220 kafka_2. 12 - server] #. / bin/kafka - server - stop. Sh
#Zookepper Stop command[root @ Server1d220 kafka_2. 12 - server] #. / bin/zookeeper - server - stop. Sh
#Zookepper Queries information about Kafka Borkes[root@Server1d220 kafka_2.12-2.6.0]#./bin/zookeeper-shell.sh 127.0.0.1:2181 [root@Server1d220 kafka_2.12-2.6.0]# ls /brokers/ids
#Empty data
#Execute on each server[root@Server1d238 kafka_2.12-2.6.0]# rm -rf logs/* [root@Server1d238 kafka_2.12-2.6.0]# rm -rf data/* [root@Server1d238 kafka_2.12-2.6.0 Kafka_2. 12 - # rm - rf zookeeper server] / / version - 2
#Example Query zookepper configuration information
#Querying controller 0
[root@Server1d238 kafka_2.12-2.6.0]# bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"
Copy the code
Abnormal related
- Exception: The Cluster ID 4EneH1qLR3KH69gubQF8kg doesn’t match stored clusterId Some(WslSvgQJQMiTl4rT18Eugg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
Dirs =/home/lib/kafka_2.12-2.6.0/logs
- Replication factor: 3 LARGER than available brokers: 1
Solution: The Kafka service currently has less brokers available than the set replication-factor. Modify the replication-factor parameter of the Topic.
Maxwell
Maxwell’s installation
Maxwell GitHub is installed on a standalone server
#download[root @ Server1d220 lib] # wget HTTP: / / https://github.com/zendesk/maxwell/releases/download/v1.23.5/maxwell-1.23.5.tar.gz
#Unpack the installation
[root@Server1d220 lib]# tar -zxvf maxwell-1.23.5.tar.gz
Copy the code
Maxwell Modifying configurations
#The configuration file was renamed[root @ Server1d220 maxwell - 1.23.5] # config. The mv properties. The example config. The properties
#Modifying a Configuration File[root @ Server1d220 maxwell - 1.23.5] # vim config. The properties#The sample id
client_id=maxwell-dev
# logconfiguration
#Debug is recommended for the first startup. You can enable mysql data and Kafka requests, and then change to INFO after it stabilizes
log_level=info
#Kafka configurationProducer = kafka kafka. The bootstrap. The servers = 192.168.1.220:9092#Mysql configurationThe host = 192.168.110.232 user = password = maxwell @ 2020 maxwell#Parse rule configuration (example: match only all tables under the distortionMediation library)
filter= exclude: contradictionmediation.*, include: contradictionmediation.*
Copy the code
Maxwell’s start
#Direct start[root@Server1d220 maxwell-1.23.5]# nohup./bin/maxwell >logs/maxwell.log 2> &1&
#Boot with parameters[root@Server1d220 maxwell-1.23.5]# nohub./bin/maxwell --user='maxwell' --password='Maxwell@2020' = '192.168.110.232' - host - producer =. Kafka, kafka bootstrap. The servers = 192.168.1.220:9092 - kafka_topic = maxwell >logs/maxwell.log 2>&1 &
#Finally the JPS - did l have a look at the process, and then look at the logs, success will print: BinlogConnectorLifecycleListener - Binlog connected.
#Parsing rules
#Matches only the TBL tables of the fooDB database and all tables with table_ numbers
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#Exclude all libraries all tables, only match db1 database
--filter = 'exclude: *.*, include: db1.*'
#Reject all updates with db.tbl.col value reject
--filter = 'exclude: db.tbl.col = reject'
#Exclude any updates that contain col_A columns
--filter = 'exclude: *.*.col_a = *'
#The BAD_DB database is completely excluded from the blacklist. To restore the blacklist, you must delete the Maxwell database
--filter = 'blacklist: bad_db.*'
Copy the code
Maxwell Indicates the parameter list
Maxwell multi-instance
Multiple instances can be deployed and each instance can be started separately
Maxwell Other commands
#Fully initializes data for a table
[root@Server1d220 maxwell-1.23.5]# ./bin/maxwell-bootstrap --user maxwell --password Maxwell@2020 --host 192.168.110.232 --database contradictionmediation --table member_info --client_id maxwell-dev
#The request data
#In the Maxwell database
delete from maxwell.`databases` where id is not null ;
delete from maxwell.`tables`where id is not null;
delete from maxwell.`schemas`where id is not null;
delete from maxwell.bootstrap where id is not null ;
delete from maxwell.columns where id is not null ;
delete from maxwell.heartbeats where true;
delete from maxwell.positions where true;
Copy the code
conclusion
In general, it is not difficult, just one step at a time. I’ll write about kafka access and the implementation of synchronizing data to ES later.
Reference documentation
- Replication Factor LARGER than Available brokers message cannot be created after Kakfa Topic creation
- Binlog-based MySQL data is synchronized to MRS cluster
- Maxwell synchronizes binlogs from mysql in real time
- MySQL Binlog parsing tool Maxwell explains