MySQL Binlog parsing tool Maxwell explains

Maxwell’s profile

Maxwell is an application that can read the MySQL binary log binlog in real time and generate JSON messages that are sent as producers to Kafka, Kinesis, RabbitMQ, Redis, Google Cloud Pub/Sub, files, and other platforms. Its common application scenarios include ETL, cache maintenance, table level DML metrics collection, increment to search engine, data partition migration, database cutting binlog rollback scheme, etc. Website (http://maxwells-daemon.io), making (https://github.com/zendesk/maxwell)

Maxwell provides the following functions:

  • supportSELECT * FROM tableFor full data initialization
  • Support automatic recovery of the binlog position (GTID) after a failover occurs in the primary database
  • Data can be partitioned to prevent data skew. Data sent to Kafka can be partitioned for database, table, and column levels
  • The working mode is disguised as Slave, receiving binlog events, and then assembly according to schemas. DDL, XID, row and other events can be accepted

In addition to Maxwell, the most commonly used MySQL Binlog parsing tools are Ali’s Canal and mysql_Streamer. The three tools are compared as follows:

Developed by Java, Canal is divided into server and client, with numerous derivative applications with stable performance and powerful functions. Canal needs to write its own client to consume the data that Canal parses.

Maxwell’s advantage over Canal is that it is easy to use. It directly outputs data changes as JSON strings without writing a client.

Quick start

MySQL binlog is enabled for the MySQL database. MySQL binlog is enabled for the MySQL database.

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master
binlog_format=row
Copy the code

Create a Maxwell user and grant some permissions to the Maxwell library

CREATE USER 'maxwell'@The '%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@The '%';
GRANT SELECT.REPLICATION CLIENT.REPLICATION SLAVE on*. *to 'maxwell'@The '%'; 
Copy the code

Before using Maxwell, you need to start Kafka

Wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz tar - XZF kafka_2. 11-2.1.0. TGZcdKafka_2. 11-2.1.0# start the Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code

Open the vi config/server.properties configuration file and add advertise.host. name to the end of the file. The value is the IP address of the host where Kafka resides

Advertised. Host. Name = 10.100.97.246Copy the code

Otherwise, an exception will be reported when I start Maxwell with Docker (where Hadoop2 is my host name).

17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: Hadoop2:9092 at org.apache.kafka.common.net work. The Selector. Connect (217) the Selector. Java: ~ [kafka - clients - 1.0.0. Jar:?] the at Org.apache.kafka.clients.Net workClient. InitiateConnect (NetworkClient. Java: 793) [kafka - clients - 1.0.0. Jar:?] the at Org.apache.kafka.clients.Net workClient. Ready (NetworkClient. Java: 230) [kafka - clients - 1.0.0. Jar:?] the at Org. Apache. Kafka. Clients. Producer. The internals. Sender. SendProducerData (263) Sender. Java: [kafka - clients - 1.0.0. Jar:?] the at Org. Apache. Kafka. Clients. Producer. The internals. Sender. Run (238) Sender. Java: [kafka - clients - 1.0.0. Jar:?] the at Org. Apache. Kafka. Clients. Producer. The internals. Sender. Run (176) Sender. Java: [kafka - clients - 1.0.0. Jar:?] the at Java. Lang. Thread. The run (Thread. Java: 748) [? : 1.8.0 comes with _181] under Caused by: Java nio. Channels. UnresolvedAddressException at sun.nio.ch.Net.checkAddress (.net, Java: 101) ~ [? : 1.8.0 comes with _181] the at Sun. Nio. Ch. SocketChannelImpl. Connect (SocketChannelImpl. Java: 622) ~ [? : 1.8.0 comes with _181] the at Org.apache.kafka.common.net work. The Selector. Connect (214) the Selector. Java: ~ [kafka - clients - 1.0.0. Jar:?]... 6 moreCopy the code

You can then start Kafka

bin/kafka-server-start.sh config/server.properties
Copy the code

Test kafka

# Create a topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# list all topics
bin/kafka-topics.sh --list --zookeeper localhost:2181

Start a producer and send random messages
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

# Launch the consumer on another terminal and watch the consumed message
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
Copy the code

Quick install and use Maxwell with Docker (of course you need to install docker yourself)

# pull mirror
docker pull zendesk/maxwell

Start Maxwell and print the parsed binlog to the console
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout
Copy the code

To test Maxwell, first create a simple table and then add and delete data

CREATE TABLE `test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) DEFAULT NULL.`name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
insert into test values(1.22."Little whirl");
update test set name='whirly' where id=1;
delete from test where id=1;
Copy the code

Observe the output of the Docker console. In the output log, you can see the FORMAT of the JSON string of binlog parsed by Maxwell

{"database":"test"."table":"test"."type":"insert"."ts": 1552153502,"xid": 832,"commit":true."data": {"id": 1,"age": 22."name":"Little whirl"}}
{"database":"test"."table":"test"."type":"update"."ts": 1552153502,"xid": 833,"commit":true."data": {"id": 1,"age": 22."name":"whirly"},"old": {"name":"Little whirl"}}
{"database":"test"."table":"test"."type":"delete"."ts": 1552153502,"xid": 834,"commit":true."data": {"id": 1,"age": 22."name":"whirly"}}
Copy the code

Output to Kafka, close docker, and reset startup parameters

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug
Copy the code

Then start a consumer to consume maxwell Topic messages and observe the output. Execute the SQL to add, modify, or delete data again, and still get the same output

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell
Copy the code

Output the format of the JSON string

  • Data Latest data, modified data
  • Old Indicates the old data before modification
  • Type Operation type. Insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop, Bootstrap-insert, int(unknown type)
  • Xid transaction id
  • Commit The same xID represents the same transaction, and the last statement of the transaction will have a COMMIT, which can be used to reproduce the transaction
  • server_id
  • thread_id
  • The output_DDl parameter is added when you run the program to capture the DDL statement
  • The datetime column will output “YYYY-MM-DD hh: MM :ss”. If “0000-00-00 00:00:00” is encountered, the output will be unchanged
  • Maxwell supports multiple encodings, but outputs only UTF8 encodings
  • Maxwell’s TIMESTAMP is always treated as UTC. If you want to adjust it to your own time zone, you need to process it logically in the back end

The configuration related to the output format is as follows

options The parameter value describe The default value
output_binlog_position BOOLEAN Whether to include binlog Position false
output_gtid_position BOOLEAN Whether gTID Position is included false
output_commit_info BOOLEAN Whether commit and xID are included true
output_xoffset BOOLEAN Specifies whether virtual TX-row offset is included false
output_nulls BOOLEAN Whether a field with a value of NULL is included true
output_server_id BOOLEAN Specifies whether to contain server_id false
output_thread_id BOOLEAN Whether thread_id is included false
output_schema_id BOOLEAN Specifies whether to contain schema_id false
output_row_query BOOLEAN Whether INSERT/UPDATE/DELETE statements are included. Mysql needs to be enabledbinlog_rows_query_log_events false
output_ddl BOOLEAN Whether DDL (table-alter, table-create, etc) events is included false
output_null_zerodates BOOLEAN Convert ‘0000-00-00’ to NULL? false

Use the advanced

Basic Configuration

options The parameter value describe The default value
config The configuration fileconfig.propertiesThe path of the
log_level [debug | info | warn | error] The level of logging info
daemon Specify Maxwell instances to run in the background as daemons
env_config_prefix STRING Environment variables that match the prefix are considered configuration values

You can write Maxwell’s startup parameters to a configuration file, config.properties, and specify it with the config option, bin/ Maxwell –config config.properties

User = Maxwell password=123456 host=10.100.97.246 Producer =kafka kafka.bootstrap.servers=10.100.97.246:9092 kafka_topic=maxwellCopy the code

Mysql configuration options

Maxwell divides MySQL into three roles based on its purpose:

  • Host: creates a maxwell database table to store captured schema information

    • There are mainly six tables. Bootstrap is used for data initialization. The schemas record all binlog file information. Positions records the displacement information from reading binlog, and Heartbeats records the heartbeat information
  • Replication_host: replication host, Event listening, read the host binlog

    • willhostandreplication_hostSeparation can be avoidedreplication_userWrite data to the production repository
  • Schema_host: Schema host that captures the schema of the table structure

    • Because the binlog does not contain field information, Maxwell needs to retrieve the schema from the database and save it.
    • schema_hostI don’t usually use it, but I dobinlog-proxyIt’s useful in the context. For example, if you want to generate a JSON stream from an offline binlog using Maxwell, you can create a mysql server with no structure in it and only use it to send binlogs. In this case, you can obtain the table mechanism from schema_host.

Generally, all three hosts are the same, and schemA_host is used only when Replication_host exists.

The following configurations are associated with MySQL

options The parameter value describe The default value
host STRING Mysql address localhost
user STRING Mysql user name
password STRING The mysql password (no password)
port INT Mysql port 3306
jdbc_options STRING mysql jdbc connection options DEFAULT_JDBC_OPTS
ssl SSL_OPT SSL behavior for mysql cx DISABLED
schema_database STRING Maxwell used to maintain the schema and position used by the database maxwell
client_id STRING A unique string used to identify a Maxwell instance maxwell
replica_server_id LONG A unique number used to identify a Maxwell instance 6379 (see notes)
master_recovery BOOLEAN enable experimental master recovery code false
gtid_mode BOOLEAN Whether to enable GTID-based replication false
recapture_schema BOOLEAN Recapture the latest table structure (schema), not configurable in config.properties false
replication_host STRING server to replicate from. See split server roles schema-store host
replication_password STRING password on replication server (none)
replication_port INT port on replication server 3306
replication_user STRING user on replication server
replication_ssl SSL_OPT SSL behavior for replication cx cx DISABLED
schema_host STRING server to capture schema from. See split server roles schema-store host
schema_password STRING password on schema-capture server (none)
schema_port INT port on schema-capture server 3306
schema_user STRING user on schema-capture server
schema_ssl SSL_OPT SSL behavior for schema-capture server DISABLED

Allocation of producers

Only kafka is described. The configuration of other producers is described in the official documentation.

Kafka is one of the most well-supported producers in Maxwell, and has multiple built-in versions of kafka clients (0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.). Default kafka_version=1.0.0 (Current Maxwell version 1.20.0)

Maxwell posts messages to Kafka’s Topic, which is specified by the kafka_topic option. The default value is Maxwell, and you can specify dynamic topics as well as static topics. For example, namespace_%{database}_%{TABLE}, %{database} and %{table} will be replaced by the database and table of the specific message.

When Maxwell reads the configuration, if the configuration item starts with kafka., the configuration is set to the connection parameters of the Kafka Producer client, for example

kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5
Copy the code

The following are the Maxwell generic producer and Kafka producer configuration parameters

options The parameter value describe The default value
producer PRODUCER_TYPE Producer type stdout
custom_producer.factory CLASS_NAME Custom consumer factory class
producer_ack_timeout PRODUCER_ACK_TIMEOUT Timeout for asynchronous consumption to consider messages lost (ms)
producer_partition_by PARTITION_BY Enter a partition function into Kafka/Kinesis database
producer_partition_columns STRING If partitioned by column, use comma-separated column names
producer_partition_by_fallback PARTITION_BY_FALLBACK producer_partition_by=columnIs required when the column does not exist
ignore_producer_error BOOLEAN If false, exit the program if kafka/ Kinesis fails. If the value is true, only logs are recorded. See Alsodead_letter_topic true
kafka.bootstrap.servers STRING Kafka cluster list,HOST:PORT[,HOST:PORT]
kafka_topic STRING kafka topic maxwell
dead_letter_topic STRING See official documentation for details.
kafka_version KAFKA_VERSION Specifies the Kafka producer client version of Maxwell, which cannot be configured in config.properties 0.11.0.1
kafka_partition_hash [default | murmur3] Hash method used to select the Kafka partition default
kafka_key_format [array | hash] how maxwell outputs kafka keys, either a hash or an array of hashes hash
ddl_kafka_topic STRING whenoutput_ddlWhen true, all DDL messages will be delivered to the topic kafka_topic

Filter configuration

Maxwell can specify filtering rules using the –filter configuration item, exclude or include, and specify specific databases, data tables, and data columns. You can even define complex filtering rules using Javascript. This can be described using regular expressions, and there are several examples from the official website

Select fooDB from TBL and fooDB from table_
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
Select * from db1; select * from db1
--filter = 'exclude: *.*, include: db1.*'
Reject all updates with db.tbl.col (reject)
--filter = 'exclude: db.tbl.col = reject'
Exclude any updates that contain col_A columns
--filter = 'exclude: *.*.col_a = *'
The bad_DB database is completely excluded. To recover the database, you must delete the Maxwell database
--filter = 'blacklist: bad_db.*' 
Copy the code

Data initialization

When Maxwell starts, it gets the last stopped position from the Maxwell library and reads the binlog from that breakpoint. If the binlog is cleared, how can Maxwell copy the entire table? So how do you initialize the data?

To manipulate the entire table and artificially generate a binlog? How about finding a field that doesn’t affect business, like update_time, and then adding one second and subtracting one second?

update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever - 1 second);
Copy the code

There are obviously several big problems:

  • What if there isn’t an unimportant field? Each field is too important to be arbitrarily modified!
  • If the entire table is large, the modification process takes a long time and affects services!
  • A lot of non-business binlogs will be generated!

For data initialization, Maxwell provides a command tool called Maxwell-bootstrap to help you complete data initialization. Maxwell-bootstrap performs full data initialization using SELECT * FROM table. No extra binlog!

The tool has the following parameters:

parameter instructions
--log_level LOG_LEVEL Log levels (DEBUG, INFO, WARN or ERROR)
--user USER Mysql user name
--password PASSWORD The mysql password
--host HOST Mysql address
--port PORT Mysql port
--database DATABASE The database to bootstrap the table from
--table TABLE The table to boot
--where WHERE_CLAUSE Setting filter Conditions
--client_id CLIENT_ID Specifies the Maxwell instance to boot from

To do this, we will boot the test table in the test database. First, we will prepare some data for the test

INSERT INTO `test` VALUES (1.1.'1');
INSERT INTO `test` VALUES (2.2.'2');
INSERT INTO `test` VALUES (3.3.'3');
INSERT INTO `test` VALUES (4.4.'4');
Copy the code

And then reset the master; Clear the binlog and delete tables in the Maxwell library. Next, use the commands in Quick Start to start Kafka, Maxwell, and Kafka consumer, and then maxwell-bootstrap

Docker runit --rm Zendesk /maxwell bin/ maxmaxwell -bootstrap --user maxwell \ --password 123456 --host 10.100.97.246 --databasetest --table test --client_id maxwell
Copy the code

Note: –bootstrapper=sync blocks normal binlog parsing while processing bootstrap; Bootstrapper =async does not block.

You can also execute the following SQL to insert records into the Maxmaxwell. Bootstrap table, triggering manually

insert into maxwell.bootstrap (database_name, table_name) values ('test'.'test');
Copy the code

You can now see the data bootstrapped from the Kafka consumer side

{"database":"maxwell"."table":"bootstrap"."type":"insert"."ts": 1552199115,"xid": 36738,"commit":true."data": {"id": 3."database_name":"test"."table_name":"test"."where_clause":null,"is_complete": 0."inserted_rows": 0."total_rows": 0."created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position": 0."client_id":"maxwell"}}
{"database":"test"."table":"test"."type":"bootstrap-start"."ts": 1552199115,"data": {}} {"database":"test"."table":"test"."type":"bootstrap-insert"."ts": 1552199115,"data": {"id": 1,"age": 1,"name":"1"}}
{"database":"test"."table":"test"."type":"bootstrap-insert"."ts": 1552199115,"data": {"id": 2."age": 2."name":"2"}}
{"database":"test"."table":"test"."type":"bootstrap-insert"."ts": 1552199115,"data": {"id": 3."age": 3."name":"3"}}
{"database":"test"."table":"test"."type":"bootstrap-insert"."ts": 1552199115,"data": {"id": 4."age": 4."name":"4"}}
{"database":"maxwell"."table":"bootstrap"."type":"update"."ts": 1552199115,"xid": 36756,"commit":true."data": {"id": 3."database_name":"test"."table_name":"test"."where_clause":null,"is_complete": 1,"inserted_rows": 4."total_rows": 0."created_at":null,"started_at":"The 2019-03-10 14:25:15"."completed_at":"The 2019-03-10 14:25:15"."binlog_file":"mysql-bin.000001"."binlog_position": 64446,"client_id":"maxwell"},"old": {"is_complete": 0."inserted_rows": 1,"completed_at":null}}
{"database":"test"."table":"test"."type":"bootstrap-complete"."ts": 1552199115,"data": {}}Copy the code

The type of the test is bootstrap-insert. The type of the test is bootstrap-insert.

Then look at binlog again, show binlog events; The maxwell bootstrap command does not generate redundant binlogs. This benefit is more obvious when the number of data tables is large

The Bootstrap procedure is bootstrap-start -> bootstrap-insert -> bootstrap-complete, where the data fields of start and complete are empty and do not carry data.

If Maxwell crashes during bootstrap, bootstrap restarts completely, regardless of how far it has gone. If you don’t want to do this, you can either go to the database and set the is_Complete field to 1(indicating completion) or delete the row

Maxwell’s monitoring

Maxwell provides four monitoring modes: Base Logging Mechanism, JMX, HTTP or by Push to Datadog. Configuration items related to monitoring are as follows:

options The parameter value describe The default value
metrics_prefix STRING Index prefix MaxwellMetrics
metrics_type [slf4j | jmx | http | datadog] The way indicators are published
metrics_jvm BOOLEAN Whether to collect JVM information false
metrics_slf4j_interval SECONDS How often indicators are logged,metrics_typeSlf4j must be configured 60
http_port INT metrics_typeIf the value is HTTP, the port bound to the indicator is published 8080
http_path_prefix STRING HTTP path prefix /
http_bind_address STRING HTTP Address of the binding indicator all addresses
http_diagnostic BOOLEAN HTTP Whether to enable the diagnostic suffix false
http_diagnostic_timeout MILLISECONDS HTTP diagnostic response timeout period 10000
metrics_datadog_type [udp | http] metrics_typeHow metrics are published for datadog udp
metrics_datadog_tags STRING Tags supplied to datadog, such as TAG1 :value1, TAG2 :value2
metrics_datadog_interval INT Push indicator to datadOG frequency, in seconds 60
metrics_datadog_apikey STRING whenmetrics_datadog_type=httpDatadog API key
metrics_datadog_host STRING whenmetrics_datadog_type=udpThe target address of the push indicator localhost
metrics_datadog_port INT whenmetrics_datadog_type=udpWhen push indicator port 8125

What specific monitoring indicators can be obtained? Note that all indicators are preconfigured with metrics_prefix

indicators type instructions
messages.succeeded Counters Number of messages successfully sent to Kafka
messages.failed Counters Number of failed messages
row.count Counters Number of binlog rows processed. Note that not all binlogs are sent to Kafka
messages.succeeded.meter Meters Rate at which messages are successfully sent to Kafka
messages.failed.meter Meters Rate at which messages fail to be sent to Kafka
row.meter Meters The rate at which rows arrive from the Binlog connector to Maxwell
replication.lag Gauges The time in milliseconds between the database transaction being committed and Maxwell processing the transaction
inflightmessages.count Gauges Number of messages currently being processed (waiting for confirmation from the destination, or before the message)
message.publish.time Timers Time to send a record to Kafka in milliseconds
message.publish.age Timers The time (milliseconds) between the event being generated by the database and the event being sent to Kafka is accurate to +/-500ms
replication.queue.time Timers The time it took to send a binlog event to the processing queue in milliseconds

Some of the metrics above are unique to Kafka and do not support all producers.

Experiment, through HTTP to obtain monitoring indicators

docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
    --metrics_type=http  --metrics_jvm=true --http_port=8080
Copy the code

–metrics_type= HTTP — metrics_JVM =true –http_port=8080 — metrics_JVM =true –http_port=8080 Enable the JVM information collection, port 8080, after can through http://10.100.97.246:8080/metrics you can get all the indicators

The HTTP mode has four suffixes corresponding to four different formats

endpoint instructions
/metrics All indicators are returned in JSON format
/prometheus All metrics are returned in Prometheus format (Prometheus is an open source combination of monitoring & alarm & time series databases)
/healthcheck Returns whether Maxwell was healthy for the past 15 minutes
/ping Simple test, returnpong

If you use JMX to collect Maxwell monitoring indicators, you can configure the JMX access permission using the JAVA_OPTS environment variable

export JAVA_OPTS="-Dcom.sun.management.jmxremote \ -Dcom.sun.management.jmxremote.port=9010 \ -Dcom.sun.management.jmxremote.local.only=false \ -Dcom.sun.management.jmxremote.authenticate=false \ - Dcom. Sun. Management. Jmxremote. SSL = false \ - Djava. Rmi server hostname = 10.100.97.246"
Copy the code

Multiple Maxwell instances

Maxwell can run multiple instances on the same primary server in different configurations. This is useful if you want producers to run in different configurations, such as Posting events from different groups of tables to different topics. Each instance of Maxwell must be configured with a unique client_ID to distinguish binlog locations.

GTID support

Maxwell supports GTID-based replication (GTID-based Replication) since version 1.8.0. In GTID mode, Maxwell transparently selects a new replication location after a host changes.

What is GTID Replication?

The Global Transaction ID (GTID) is the number of a committed Transaction and is a globally unique number.

A gTID-based replication method has been added since MySQL 5.6.5. GTID ensures that each transaction committed on the master library has a unique ID in the cluster. This approach enhances the consistency, fault recovery, and fault tolerance of databases.

In the original binary log-based replication, the slave needed to tell the master which offset to perform incremental synchronization from, and if specified incorrectly, the data would be missed, resulting in inconsistencies. With the help of GTID, in the case of a master/slave switchover, other MySQL slave libraries can automatically find the correct replication location on the new master database, which greatly simplifies cluster maintenance in a complex replication topology and reduces the risk of manual replication location misoperation. In addition, GTID-based replication can ignore transactions that have already been executed, reducing the risk of data inconsistencies.

Matters needing attention

timestamp column

Maxwell treats all time types (datetime, timestamp, date) as strings to ensure data consistency (for example, 0000-00-00 00:00:00 is invalid in timestamp, but mysql recognizes that, To parse to Java or Python type is null/None).

If the MySQL table contains a timestamp, the binlog will parse the UTC time, but the user will see the local time. For example, f_create_time timestamp created at Beijing time 2018-01-05 21:01:01, mysql actually stores 2018-01-05 13:01:01, and binlog contains the same time string. If you don’t do the consumer and you don’t do the time zone switch, you lose 8 hours.

Instead of every client having to worry about this, I think it makes more sense to provide time zone parameters and maxwell automatically handles time zone issues, otherwise the client would either need to know which columns are of type TIMESTAMP or connect to the original library cache for those types.

binary column

Maxwell can process binary columns, such as blob and varbinary, by using base64_encode for binary columns and exporting them to JSON as strings. Once consumers have this column data, they cannot assemble it directly, but need base64_decode.

Table structures are out of sync

If you take an old binlog and insert it into a new mysql server, maxwell may have changed the table structure. For example, there are more columns in binlog than in schemA_host. ## alibabA_rds_rowid ##__ is not found in show create table and schema, but in binlog. Sync to slave library.

In addition, we have git to manage the structural version, if there is such a scenario, can also deal with.

Large transactions binlog

When a large number of binlogs are generated for an object, such as a daily table migration, Maxwell automatically puts the binlogs to the file system to control memory usage

Using Kafka version: 0.11.0.1 21:16:07,109 WARN MaxwellMetrics - Metrics will not be exposed: MetricsReportingType not configured. 21:16:07,380 INFO SchemaStoreSchema - Creating Maxwell Database 21:16:07,540 INFO Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714], LastHeartbeat =0] data sharing for schema capture BinlogConnectorReplicator - Setting initial binlog pos to: Mysql - bin. 006235:24980714 21:16:08, 324 INFO BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com: 3306 ats  mysql-bin.006235/24980714 (sid:637 9, Cid: 9182598) 21:16:08, 325 INFO BinlogConnectorLifecycleListener - Binlog connected. 03:15:36, 104 INFO ListWithDiskBuffer  - Overflowedin-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events
Copy the code

But another problem, the abnormal EventDataDeserializationException overflow then: Failed to deserialize data of EventHeaderV4, when I another maxwell pointed to the previous binlog postion began to parse, but there was no exception thrown. Subsequent data also showed that there was no data loss.

The causes of the problem is still unknown, under Caused by: java.net.SocketException: Connection reset, feel like read binlog flow when haven’t read the complete event, closed the Connection. This problem is fairly persistent, and github has yet to see a clear solution to similar problems. Insert into (insert into, insert into, insert into); Select done)

03:18:20,586 INFO listwithDiskBuffer-Overstep learningin-memory buffer, Spilling over into/TMP/maxwell5229190074667071141events 03:19:31, 289 WARN BinlogConnectorLifecycleListener - Communication failure. com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0} at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~ [mys ql - binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) Onnector ~ [mysql binlog - c - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql binlog - connector - Java - 0. 13.0 the jar: 0.13.0] the at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql binlog - connector - Java - 0.13.0. Jar: 0.13 0] at the dead simple. Shyiko.. Mysql binlog. BinaryLogClient$7.run(binaryLogclient.java :793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0] at java.lang.Thread.run(thread.java :745) [mysql-binlog-connector-java-0.13.0.jar:0.13.0] at java.lang. [? : 1.8.0 comes with _121] under Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read (SocketInputStream. Java: 210) ~ [? : 1.8.0 comes with _121] the at Java.net.SocketInputStream.read SocketInputStream. Java: (141) ~ [? : 1.8.0 comes with _121] the at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~ [mysql - binlo g - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~ [mysql binlog - connector - Java - 0.13 0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~ [mysql binlog - connector - jar a - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEvent DataD eserializer. Java: 212) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEvent DataD eserializer. Java: 150) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventD AtaDeserializer. Java: 132) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDe Serializer. Java: 64) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeseri Alizer. Java: 56) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeseri Alizer. Java: 32) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0] the at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~ [mysql binlog - connector - Java - 0.13.0. Jar: 0.13.0]... 5 more 03:19:31, 514 INFO BinlogConnectorLifecycleListener - Binlog disconnected. 03:19:31, 590 WARN BinlogConnectorReplicator - replicator stopped at position: Mysql -bin.006236:520531744 -- Restarter 03:19:31,595 INFO BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)Copy the code

tableMapCache

Include_tables if I want to get binlog changes for certain tables, I need to filter them using include_tables. If I want to get binlog changes for certain tables, I need to filter them using include_tables. When Maxwell starts, the deleted table T1 cannot pull the table structure. However, there was a change in T1 in yesterday’s binlog, because there was no table structure to assemble into JSON, it would throw an exception.

Manually inserting records into Maxwell. tables/columns is possible. The root of the problem is that Maxwell only handles row_events in binlog filtering, but requires all tables in tableMapCache to have binlog.

Include_dbs /tables: tableMapCache include_dbs/tables: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb

Improve consumption performance

With RabbitMQ, the routing_key is %db%.%table%, but the binlog increment generated by some tables is very large, resulting in the queue size is not even, because transaction xID or thread_id level concurrent playback, so the minimum queue size is table. Try to put a separate queue, other small data together.

binlog

Maxwell maintains the displacement of the binlog in the Maxwell library for some reason, such as reset master; If the records in the Maxwell library do not match the actual binlog, an exception is reported. You can manually correct the binlog displacement or directly clear or delete the Maxwell library for reconstruction

com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)
Copy the code

As well as

com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)
Copy the code

Reference documentation

  • Maxwell’s Daemon Doc
  • Maxwell 1.17.1 MySQL Binlog parsing tool installation and use
  • Sean. Self-built Binlog subscription service — Maxwell
  • MySQL 5.7 master/slave replication practices based on GTID