Copyright belongs to the author, any form of reprint please contact the author to obtain authorization and indicate the source.
The introduction of canal
History of Canal
In the early days, Alibaba deployed database instances in computer rooms in Hangzhou and the United States. However, due to the business requirement of synchronizing data across computer rooms, Canal was born, and incremental changes were obtained mainly based on trigger. Since 2010, Alibaba has gradually tried database log parsing to obtain incremental change data for synchronization, which has derived incremental subscription and consumption business.
Current canal supports Mysql versions on the data source side including (5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x)
Application scenario of Canal
The current common log based incremental subscription and consumption services mainly include
- Provides incremental data subscription and consumption based on database incremental log resolution
- Database mirroring
- Real-time Database backup
- Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
- Service Cache Refresh
- Incremental data processing with business logic
How Canal works
Before introducing canal’s principle, let’s take a look at the principle of MySQL master-slave replication
MySQL master-slave replication principle
- The MySQL master writes data changes to the binary log
binary log
, where the recorded content is called binary log eventsbinary log events
, can be accessed throughshow binlog events
Command to view - MySQL slave will take master
binary log
In thebinary log events
Copy to its relay logrelay log
- MySQL slave rereads and executes
relay log
Mapping data changes to its own database tables
Given how MySQL works, we can roughly guess that Canal uses similar logic to implement incremental data subscriptions. So let’s see how Canal actually works.
Working principle of CANAL
- Canal emulated the interaction protocol of the MySQL slave, disguised itself as the MySQL slave, and sent the dump protocol to the MySQL master
- The MySQL master receives the dump request and starts pushing
binary log
To slave. - Canal parsing
binary log
Object (data isbyte
Flow)
Based on this principle and method, it can complete the acquisition and parsing of database incremental logs, provide incremental data subscription and consumption, and realize the real-time incremental data transmission function of mysql.
Since Canal is such a framework and written in pure Java, let’s start learning how to use it and apply it to our actual work.
Canal’s Docker environment preparation
Due to the popularity of containerization technology at present, this paper uses Docker to quickly build a development environment. However, after we learn how to build a Docker container environment, we can also build a successful environment in the traditional way. Since this article mainly explains Canal, there will not be too much content about Docker, and it will mainly introduce the basic concepts and command use of Docker.
What is a Docker
It is believed that the vast majority of people have used the virtual machine Vmware. When using Vmware to build an environment, they only need to provide a common system image and install it successfully. The rest of the software environment and application configuration are also operated in the virtual machine as we operate on the machine, and Vmware occupies more resources of the host machine. Easy to cause the host machine to stall, and the system image itself also takes up too much space.
In order to facilitate the quick understanding of Docker, it is compared with Vmware to introduce Docker. Docker provides a platform to start, package and run apps, isolating apps from the underlying infrastructure. The two main concepts in Docker are image (similar to Vmware system image) and container (similar to Vmware installed system).
What is an Image?
- Collection of files and Meta Data (root filesystem)
- Layered, and each layer can add changes to delete files to become a new image
- Different images can share the same layer
- The Image itself is read-only
What is a Container?
- Create (copy) from Image
- Create a Container Layer on top of the Image Layer (read-write)
- Analogical object orientation: Classes and instances
- Image stores and distributes the app, and Container runs the app
Docker network introduction
There are three types of Docker networks: Bridge: bridge network
By default, all Docker containers started use bridge, and the bridge network created during Docker installation. Every time Docker container is restarted, the corresponding IP address will be obtained in sequence, which leads to the CHANGE of Docker IP address under restartCopy the code
None: Indicates that no network is specified
With --network= None, the Docker container does not assign IP for the LANCopy the code
Host: indicates the host network
Use --network=host, in this case, the Docker container network will be attached to the host, the two are interconnected. For example, if you run a Web service in the container and listen on port 8080, the host port 8080 is automatically mapped to the container.Copy the code
Creating a custom network :(set a fixed IP address)
Docker network create --subnet=172.18.0.0/16 mynetworkCopy the code
Check the existing docker network LS
Build canal Environment
Docker Download ==> Docker Download
Download the canal image docker pull canal/canal-server
docker pull mysql
docker images
## Generate mysql container
docker run -d--name mysql --net mynetwork -- IP 172.18.0.6 -p 3306:3306-e MYSQL_ROOT_PASSWORD=root mysql
Generate the canal-server container
docker run -d--name canal-server --net mynetwork -- IP 172.18.0.4 -p 11111:11111 canal/canal-server## Command introduction
--net mynetwork # Use a custom network
--ip # assign IP address
Copy the code
View the container Docker PS running in docker
MySQL configuration changes
This is just a rudimentary setup, but how did Canal impersonate Salve and get the binary log from mysql correctly?
Set binlog-format to ROW mode. Modify the MySQL configuration file to enable bin_log. Use find / -name my.cnf to find my.cnf and modify the file as follows
[mysqld]
log-bin=mysql-bin # open binlog
binlog-format=ROW Select ROW mode
server_id=1 MySQL replaction (); MySQL replaction ()
Copy the code
Docker exec -it mysql bash create account canal and GRANT privileges as mysql slave
mysql -uroot -proot
# create account
CREATE USER canal IDENTIFIED BY 'canal';
# grant permission
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@The '%' ;
Refresh and apply
FLUSH PRIVILEGES;
Copy the code
After the database is restarted, simply test whether the my.cnf configuration takes effect
show variables like 'log_bin';
show variables like 'log_bin';
show master status;
Copy the code
The configuration of canal-server is modified
Into the canal – server container docker exec – it canal – server bash editor canal – server configuration vi canal – server/conf/example/instance properties
Canal Configuration description
docker restart canal-server
docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
Copy the code
Pull data and synchronize it to ElasticSearch
ElasticSearch in this document is built based on Docker, so you can run the following command
# Download the mirrorDocker Pull ElasticSearch :7.1.1 Docker Pull Mobz/ElasticSearch -head:5-alpineCreate the container and run it
docker run -d--name elasticSearch --net mynetwork -- IP 172.18.0.2 -p 9200:9200 -p 9300:9300-e "discovery.type=single-node"Elasticsearch: 7.1.1 docker run-d--name elasticSearch-head --net mynetwork -- IP 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpineCopy the code
Now that the environment is ready, it’s time to get to the actual coding part, which is how to get canal’s binlog data parsed by the application. First, we built a Canal demo application based on Spring Boot. The structure is shown in the figure below
Student.java
package com.example.canal.study.pojo; import lombok.Data; import java.io.Serializable; /** * @data public class Student implements Serializable {private String id; private String name; private int age; private String sex; private String city; }Copy the code
CanalConfig.java
package com.example.canal.study.common; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; * @author haha */ @configuration public class CanalConfig {// @value fetch Application.properties Configure the middle content @value ("${canal.server.ip}")
private String canalIp;
@Value("${canal.server.port}")
private Integer canalPort;
@Value("${canal.destination}")
private String destination;
@Value("${elasticSearch.server.ip}")
private String elasticSearchIp;
@Value("${elasticSearch.server.port}")
private Integer elasticSearchPort;
@Value("${zookeeper.server.ip}") private String zkServerIp; /** * Get simple canal-server connection */ @bean public CanalConnectorcanalSimpleConnector() {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, ""."");
returncanalConnector; } /** * Public CanalConnector */ @bean public CanalConnectorcanalHaConnector() {
CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, ""."");
returncanalConnector; } /** * ElasticSearch 7.x client */ @bean public RestHighLevelClientrestHighLevelClient() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
);
returnclient; }}Copy the code
Canaldataparser.java Because there is a lot of code in this class, important parts of it are selected in this article. Other parts of the code can be obtained from Github
@param <A> @param <B> */ public static class TwoTuple<A, B> {public final A eventType; public final B columnMap; public TwoTuple(A a, B b) { eventType = a; columnMap = b; }} /** * Parse the contents of the message object in Canal * @param entrys * @return
*/
public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
for(Entry Entry: entrys) {// Binlog event event event long executeTime = entry.getheader ().getexecuteTime (); Long delayTime = system.currentTimemillis () -executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // The current entry (binarylogEvent) is a transactionif (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:"+ entry.toString(), e); } // Print the transaction header, the thread ID of the execution, Logger.info (transaction_format, new Object[]{entry.getheader ().getLogfilename (), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:"+ entry.toString(), e); } // Print transaction commit information, transaction ID logger.info("----------------\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
}
continue; } // Current entry (binarylogThe item type of event is raw dataif(entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null; RowChage = rowchange.parsefrom (entry.getstoreValue ()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:"+ entry.toString(), e); EventType EventType = Rowchage.geteventType (); logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); // The event type is Query or data definition language (DDL)if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
continue;
}
printXAInfo(rowChage.getPropsList()); // Loop over the specific data of the current content itemfor(RowData rowData : rowChage.getRowDatasList()) { List<CanalEntry.Column> columns; // The event type is delete returns the contents of the column before the deletion, otherwise returns the contents of the column after the changeif (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else{ columns = rowData.getAfterColumnsList(); } HashMap<String, Object> map = new HashMap<>(16); // Loop the column name and value into the mapfor(Column column: columns){ map.put(column.getName(), column.getValue()); } rows.add(new TwoTuple<>(eventType, map)); }}}return rows;
}
Copy the code
ElasticUtils.java
package com.example.canal.study.common; import com.alibaba.fastjson.JSON; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentType; import java.util.Map; /** * es cruD tool class * @author haha */ @slf4j @Component Public class ElasticUtils {@autoWired private RestHighLevelClient restHighLevelClient; /** * add * @param student * @param index */ public void saveEs(student student, String index) { IndexRequest indexRequest = new IndexRequest(index) .id(student.getId()) .source(JSON.toJSONString(student), XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("Saved data to ElasticSearch successfully: {}", response.getId());
} catch (Exception e) {
log.error("Failed to save data to elasticSearch: {}", e); @param id _id * @throws Exception */ public void getEs(String index, String index) String id) { GetRequest getRequest = new GetRequest(index, id); GetResponse response = null; try { response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> fields = response.getSource();for (Map.Entry<String, Object> entry : fields.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
} catch (Exception e) {
log.error("Failed to get data from elasticSearch: {}", e); } /** * update * @param student * @param index * @throws Exception */ public void updateEs(student student, String index) { UpdateRequest updateRequest = new UpdateRequest(index, student.getId()); updateRequest.doc(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = null; try { response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("Update data to ElasticSearch succeeded: {}", response.getId());
} catch (Exception e) {
log.error("Update data to elasticSearch failed: {}", e); }} /** * Delete data by id * @param index * @param ID _id * @throws Exception */ public void DeleteEs(String index, String id) { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = null; try { response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("Data removed from elasticSearch successfully: {}", response.getId());
} catch (Exception e) {
log.error("Failed to delete data from elasticSearch: {}", e); }}}Copy the code
BinLogElasticSearch.java
package com.example.canal.study.action; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.example.canal.study.common.CanalDataParser; import com.example.canal.study.common.ElasticUtils; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; ** @author haha */ @slf4j @component public class BinLogElasticSearch {
@Autowired
private CanalConnector canalSimpleConnector;
@Autowired
private ElasticUtils elasticUtils;
//@Qualifier("canalHaConnector"Use a bean named canalHaConnector @autoWide@qualifier ("canalHaConnector") private CanalConnector canalHaConnector; public void binLogToElasticSearch() throws IOException { openCanalConnector(canalSimpleConnector); // Polling pull data Integer batchSize = 5 * 1024;while (true) {
// Message message = canalHaConnector.getWithoutAck(batchSize);
Message message = canalSimpleConnector.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
log.info("Number of binLog messages currently monitored {}", size);
if(id = = 1 | | size = = 0) {try {/ / wait 4 seconds Thread. Sleep (4000); } catch (InterruptedException e) { e.printStackTrace(); }}else{//1. Parse the message object List< canalentry. Entry> entries = message.getentries (); List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
if(tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple); / / 2. ElasticUtils. SaveEs (student,"student_index"); Canalsimpleconnector.ack (id); // 3. // canalHaConnector.ack(id); }if (tuple.eventType == CanalEntry.EventType.UPDATE) {
Student student = createStudent(tuple);
elasticUtils.updateEs(student, "student_index"); Canalsimpleconnector.ack (id); // 3. // canalHaConnector.ack(id); }if (tuple.eventType == CanalEntry.EventType.DELETE) {
elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString()); canalSimpleConnector.ack(id); // canalHaConnector.ack(id); }}}}} /** * encapsulates data into the Student object ** @param tuple * @return
*/
private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) {
Student student = new Student();
student.setId(tuple.columnMap.get("id").toString());
student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
student.setName(tuple.columnMap.get("name").toString());
student.setSex(tuple.columnMap.get("sex").toString());
student.setCity(tuple.columnMap.get("city").toString());
returnstudent; } /** * open canal connection ** @param canalConnector */ private void openCanalConnector(canalConnector) { // Connect to CanalServer canalconnector.connect (); // Subscribe destination canalconnector.subscribe (); } /** * close canal connection ** @param canalConnector */ private void closeCanalConnector(canalConnector) { / / close the connection CanalServer canalConnector. Disconnect (); / / cancel subscription destination canalConnector. Unsubscribe (); }}Copy the code
CanalDemoApplication. Java (spring boot startup class)
package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @author haha */ @springBootApplication public class CanalDemoApplication implements ApplicationRunner { @Autowired private BinLogElasticSearch binLogElasticSearch; public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args); @override public void run(ApplicationArguments args) throws Exception { binLogElasticSearch.binLogToElasticSearch(); }}Copy the code
application.properties
server.port=8081
spring.application.name = canal-demo
canal.server.ip = localhost
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = localhost:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = localhost
elasticSearch.server.port = 9200
Copy the code
The Canal cluster was set up
Through the above learning, we know the application of canala in stand-alone direct connection mode. In today’s Internet era, single instance mode is gradually replaced by cluster high availability mode, so canal’s multi-instance cluster mode how to build!
Obtain the Canal instance based on ZooKeeper
Prepare the ZooKeeper Docker image and container
docker pull zookeeper
docker run -d--name zookeeper --net mynetwork -- IP 172.18.0.3 -p 2181:2181 ZooKeeper docker run-d--name canal-server2 --net mynetwork -- IP 172.18.0.8 -p 11113:11113 Canal /canal-serverCopy the code
The final result is shown as follows:
- The machine to prepare
- The IP addresses of the canal containers are 172.18.0.4, 172.18.0.8
- IP address of the ZooKeeper container: 172.18.0.3:2181
- The IP address of mysql container is 172.18.0.6:3306
- Based on the deployment and configuration, complete the configuration on each machine. Instance name is Example in the demonstration
- Modify canal.properties, add zooKeeper configuration and modify the Canal port
Canal. Port = 11113 canal. ZkServers = 172.18.0.3:2181 canal. The instance. The global. Spring. = XML classpath:spring/default-instance.xmlCopy the code
- Create the example directory and modify instance.properties
canal.instance.mysql.slaveId = 1235 The previous canal slaveId was 1234Canal. The instance. The master. The address = 172.18.0.6:3306Copy the code
Note: The name of the instance directory on the two machines must be the same. HA mode is managed by instance name, and default-instance. XML must be selected for both machines
Start canal in two different containers. After the canal is started, you can run tail-100f logs/example/example.log to check the startup logs.
For example, I started 172.18.0.4 successfully
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running
{"active":true."address":"172.18.0.4:11111"."cid": 1}Copy the code
Client links, consumes data
By specifying the ZooKeeper address and Canal instance name, the Canal client will automatically obtain the current working node from the running node in ZooKeeper and then establish a link with it:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true."address":"172.18.0.4:11111"."cid": 1}Copy the code
The corresponding client code can be in the following form. CanalHaConnector in canalconfig. Java above is an HA connection
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181"."example".""."");
Copy the code
After the connection is successful, the Canal Server records the current working Canal Client, such as the client IP, the linked port information, etc.
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true."address":"192.168.124.5:59887"."clientId": 1001}Copy the code
After a successful data consumption, Canal Server records the binlog point of the last successful consumption in ZooKeeper (the next time you restart the client, the consumption will continue from this last point).
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition"."identity": {"slaveId":-1,"sourceAddress": {"address":"mysql.mynetwork"."port": 3306}}."postion": {"included":false."journalName":"binlog.000004"."position": 2169,"timestamp": 1562672817000}}Copy the code
Stop the working 172.18.0.4 Canal Server
docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh
Copy the code
172.18.0.8 will immediately start example instance to provide a new data service
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true."address":"172.18.0.8:11111"."cid": 1}Copy the code
At the same time, with the switch of Canal Server, the client will establish a link with the new Canal Server by obtaining the latest address in ZooKeeper and continue to consume data. The whole process is completed automatically
Exceptions and conclusions
Elasticsearch -head cannot access ElasticSearch
Es and ES-Head are two independent processes. When es-Head accesses the ES service, there is a cross-domain problem. Therefore, we need to modify the es configuration file and add some configuration items to solve this problem, as follows
[root@localhost /usr/local/elasticsearch-head-master]# cd .. / elasticsearch - 5.5.2 / config /
[root@localhost /usr/local/ elasticsearch - 5.5.2 / config]# vim elasticsearch.yml
Add the following configuration to the end of the file
http.cors.enabled: true
http.cors.allow-origin: "*"
Copy the code
After modifying the configuration file, restart the ES service
406 Not Acceptable elasticSearch -head query
Solution: 1. Go to the head installation directory. 2,cd_site/ 3. Edit vendor.js in two placesLine # 6886 contentType: "application/x - WWW - form - urlencodedInto a contentType:"application/json; charset=UTF-8"
Var inspectData = S.contentType == "Application /x-www-form-urlencoded" &&Var inspectData = s.tentType ==="application/json; charset=UTF-8" &&
Copy the code
Use elasticsearch – the rest – high – level – the client shall be reported to the org. Elasticsearch. Action. Index. IndexRequest. IfSeqNo
Add dependencies to #pom<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> The < version > 7.1.1 < / version > < / dependency ># Still need to join< the dependency > < groupId > org. Elasticsearch < / groupId > < artifactId > elasticsearch < / artifactId > < version > 7.1.1 < / version > </dependency>Copy the code
See Git Hub issues
Why does ElasticSearch not use Type in 7.x?
Why did ElasticSearch remove type from 7.x?
Using the spring – data – elasticsearch. Jar to org. Elasticsearch. Client. Transport. NoNodeAvailableException
The es TransportClient is used for elasticSearch, and the es TransportClient is not used for elasticSearch. The tool makes the call request with RestHighLevelClient, which is officially recommended by ES. See the RestHighLevelClient API
Set the Docker container to start
Docker update --restart=always [containerID] docker update --restart=always [containerID]Copy the code
Docker for Mac Network Host mode does not take effect
The host mode is for performance, but it breaks the isolation of docker and reduces security. In performance scenarios, you can use –netwokr host to enable the host mode, but note that if you start the container locally on Windows or Mac, the host mode will not work. The reason is that the host mode only supports Linux hosts.
See the official document: docs.docker.com/network/hos…
Client authenticate using SASL(unknown error)
- Zookeeper. jar is inconsistent with the Zookeeper version in Dokcer
- Zookeeper. jar uses versions prior to 3.4.6
This error means that ZooKeeper, as an external application, needs to apply for resources from the system. When applying for resources, it needs to pass authentication. Sasl is an authentication method, so we will try to bypass SASL authentication. Avoid waiting to be more efficient.
Add system.setProperty (“zookeeper.sasl.client”, “false”) to the project code; For spring Boot projects, add Zookeeper.sasl.client =false to application.properties
Reference: Increased CPU usage by unnecessary SASL checks
If the version of zookeeper.jar that depends on canal.client.jar is replaced
The canal’s official download source code to native git clone https://github.com/alibaba/canal.git, then modify the client module under pom. The XML file on the content of the zookeeper, then again MVN install
mvn install
Zookeeper returns the IP address in the Docker container, but the host IP address and the container IP address are on different network segments, so they cannot be pinged through
The hosts file can only be used for domain name to IP mapping (domain name redirection). Iptables can only be used for port redirection. However, this problem can be solved by IP to IP redirection. So we modified Canal’s official source code to do what we wanted. Modify ClusterCanalConnector. In Java connect () method.
The following is a comparison diagram of the modified content
On the selection of choice
This article sample project source ==> canal-ElasticSearch-sync