Copyright belongs to the author, any form of reprint please contact the author to obtain authorization and indicate the source.

The introduction of canal

History of Canal

In the early days, Alibaba deployed database instances in computer rooms in Hangzhou and the United States. However, due to the business requirement of synchronizing data across computer rooms, Canal was born, and incremental changes were obtained mainly based on trigger. Since 2010, Alibaba has gradually tried database log parsing to obtain incremental change data for synchronization, which has derived incremental subscription and consumption business.

Current canal supports Mysql versions on the data source side including (5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x)

Application scenario of Canal

The current common log based incremental subscription and consumption services mainly include

  • Provides incremental data subscription and consumption based on database incremental log resolution
  • Database mirroring
  • Real-time Database backup
  • Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
  • Service Cache Refresh
  • Incremental data processing with business logic

How Canal works

Before introducing canal’s principle, let’s take a look at the principle of MySQL master-slave replication

MySQL master-slave replication principle

  • The MySQL master writes data changes to the binary logbinary log, where the recorded content is called binary log eventsbinary log events, can be accessed throughshow binlog eventsCommand to view
  • MySQL slave will take masterbinary logIn thebinary log eventsCopy to its relay logrelay log
  • MySQL slave rereads and executesrelay logMapping data changes to its own database tables

Given how MySQL works, we can roughly guess that Canal uses similar logic to implement incremental data subscriptions. So let’s see how Canal actually works.

Working principle of CANAL

  • Canal emulated the interaction protocol of the MySQL slave, disguised itself as the MySQL slave, and sent the dump protocol to the MySQL master
  • The MySQL master receives the dump request and starts pushingbinary logTo slave.
  • Canal parsingbinary logObject (data isbyteFlow)

Based on this principle and method, it can complete the acquisition and parsing of database incremental logs, provide incremental data subscription and consumption, and realize the real-time incremental data transmission function of mysql.

Since Canal is such a framework and written in pure Java, let’s start learning how to use it and apply it to our actual work.

Canal’s Docker environment preparation

Due to the popularity of containerization technology at present, this paper uses Docker to quickly build a development environment. However, after we learn how to build a Docker container environment, we can also build a successful environment in the traditional way. Since this article mainly explains Canal, there will not be too much content about Docker, and it will mainly introduce the basic concepts and command use of Docker.

What is a Docker

It is believed that the vast majority of people have used the virtual machine Vmware. When using Vmware to build an environment, they only need to provide a common system image and install it successfully. The rest of the software environment and application configuration are also operated in the virtual machine as we operate on the machine, and Vmware occupies more resources of the host machine. Easy to cause the host machine to stall, and the system image itself also takes up too much space.

In order to facilitate the quick understanding of Docker, it is compared with Vmware to introduce Docker. Docker provides a platform to start, package and run apps, isolating apps from the underlying infrastructure. The two main concepts in Docker are image (similar to Vmware system image) and container (similar to Vmware installed system).

What is an Image?

  • Collection of files and Meta Data (root filesystem)
  • Layered, and each layer can add changes to delete files to become a new image
  • Different images can share the same layer
  • The Image itself is read-only

What is a Container?

  • Create (copy) from Image
  • Create a Container Layer on top of the Image Layer (read-write)
  • Analogical object orientation: Classes and instances
  • Image stores and distributes the app, and Container runs the app

Docker network introduction

There are three types of Docker networks: Bridge: bridge network

By default, all Docker containers started use bridge, and the bridge network created during Docker installation. Every time Docker container is restarted, the corresponding IP address will be obtained in sequence, which leads to the CHANGE of Docker IP address under restartCopy the code

None: Indicates that no network is specified

With --network= None, the Docker container does not assign IP for the LANCopy the code

Host: indicates the host network

Use --network=host, in this case, the Docker container network will be attached to the host, the two are interconnected. For example, if you run a Web service in the container and listen on port 8080, the host port 8080 is automatically mapped to the container.Copy the code

Creating a custom network :(set a fixed IP address)

Docker network create --subnet=172.18.0.0/16 mynetworkCopy the code

Check the existing docker network LS

Build canal Environment

Docker Download ==> Docker Download

Download the canal image docker pull canal/canal-server

docker pull mysql

docker images

## Generate mysql container
docker run -d--name mysql --net mynetwork -- IP 172.18.0.6 -p 3306:3306-e MYSQL_ROOT_PASSWORD=root mysql
Generate the canal-server container
docker run -d--name canal-server --net mynetwork -- IP 172.18.0.4 -p 11111:11111 canal/canal-server## Command introduction
--net mynetwork # Use a custom network
--ip   # assign IP address
Copy the code

View the container Docker PS running in docker

MySQL configuration changes

This is just a rudimentary setup, but how did Canal impersonate Salve and get the binary log from mysql correctly?

Set binlog-format to ROW mode. Modify the MySQL configuration file to enable bin_log. Use find / -name my.cnf to find my.cnf and modify the file as follows

[mysqld]
log-bin=mysql-bin # open binlog
binlog-format=ROW Select ROW mode
server_id=1 MySQL replaction (); MySQL replaction ()
Copy the code

Docker exec -it mysql bash create account canal and GRANT privileges as mysql slave

mysql -uroot -proot
# create account
CREATE USER canal IDENTIFIED BY 'canal'; 
# grant permission
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@The '%' ;
Refresh and apply
FLUSH PRIVILEGES;
Copy the code

After the database is restarted, simply test whether the my.cnf configuration takes effect

show variables like 'log_bin';
show variables like 'log_bin';
show master status;
Copy the code

The configuration of canal-server is modified

Into the canal – server container docker exec – it canal – server bash editor canal – server configuration vi canal – server/conf/example/instance properties

Canal Configuration description



docker restart canal-server

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
Copy the code

Pull data and synchronize it to ElasticSearch

ElasticSearch in this document is built based on Docker, so you can run the following command

# Download the mirrorDocker Pull ElasticSearch :7.1.1 Docker Pull Mobz/ElasticSearch -head:5-alpineCreate the container and run it
docker run -d--name elasticSearch --net mynetwork -- IP 172.18.0.2 -p 9200:9200 -p 9300:9300-e "discovery.type=single-node"Elasticsearch: 7.1.1 docker run-d--name elasticSearch-head --net mynetwork -- IP 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpineCopy the code

Now that the environment is ready, it’s time to get to the actual coding part, which is how to get canal’s binlog data parsed by the application. First, we built a Canal demo application based on Spring Boot. The structure is shown in the figure below

Student.java

package com.example.canal.study.pojo; import lombok.Data; import java.io.Serializable; /** * @data public class Student implements Serializable {private String id; private String name; private int age; private String sex; private String city; }Copy the code

CanalConfig.java

package com.example.canal.study.common; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; * @author haha */ @configuration public class CanalConfig {// @value fetch Application.properties Configure the middle content @value ("${canal.server.ip}")
    private String canalIp;
    @Value("${canal.server.port}")
    private Integer canalPort;
    @Value("${canal.destination}")
    private String destination;
    @Value("${elasticSearch.server.ip}")
    private String elasticSearchIp;
    @Value("${elasticSearch.server.port}")
    private Integer elasticSearchPort;
    @Value("${zookeeper.server.ip}") private String zkServerIp; /** * Get simple canal-server connection */ @bean public CanalConnectorcanalSimpleConnector() {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, ""."");
        returncanalConnector; } /** * Public CanalConnector */ @bean public CanalConnectorcanalHaConnector() {
        CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, ""."");
        returncanalConnector; } /** * ElasticSearch 7.x client */ @bean public RestHighLevelClientrestHighLevelClient() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
        );
        returnclient; }}Copy the code

Canaldataparser.java Because there is a lot of code in this class, important parts of it are selected in this article. Other parts of the code can be obtained from Github

@param <A> @param <B> */ public static class TwoTuple<A, B> {public final A eventType; public final B columnMap; public TwoTuple(A a, B b) { eventType = a; columnMap = b; }} /** * Parse the contents of the message object in Canal * @param entrys * @return
     */
    public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
        List<TwoTuple<EventType, Map>> rows = new ArrayList<>();

        for(Entry Entry: entrys) {// Binlog event event event long executeTime = entry.getheader ().getexecuteTime (); Long delayTime = system.currentTimemillis () -executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // The current entry (binarylogEvent) is a transactionif (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:"+ entry.toString(), e); } // Print the transaction header, the thread ID of the execution, Logger.info (transaction_format, new Object[]{entry.getheader ().getLogfilename (), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                    printXAInfo(begin.getPropsList());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:"+ entry.toString(), e); } // Print transaction commit information, transaction ID logger.info("----------------\n");
                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
                    printXAInfo(end.getPropsList());
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime)});
                }

                continue; } // Current entry (binarylogThe item type of event is raw dataif(entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null; RowChage = rowchange.parsefrom (entry.getstoreValue ()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:"+ entry.toString(), e); EventType EventType = Rowchage.geteventType (); logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); // The event type is Query or data definition language (DDL)if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
                    continue;
                }
                printXAInfo(rowChage.getPropsList()); // Loop over the specific data of the current content itemfor(RowData rowData : rowChage.getRowDatasList()) { List<CanalEntry.Column> columns; // The event type is delete returns the contents of the column before the deletion, otherwise returns the contents of the column after the changeif (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else{ columns = rowData.getAfterColumnsList(); } HashMap<String, Object> map = new HashMap<>(16); // Loop the column name and value into the mapfor(Column column: columns){ map.put(column.getName(), column.getValue()); } rows.add(new TwoTuple<>(eventType, map)); }}}return rows;
    }
Copy the code

ElasticUtils.java

package com.example.canal.study.common; import com.alibaba.fastjson.JSON; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentType; import java.util.Map; /** * es cruD tool class * @author haha */ @slf4j @Component Public class ElasticUtils {@autoWired private RestHighLevelClient restHighLevelClient; /** * add * @param student * @param index */ public void saveEs(student student, String index) { IndexRequest indexRequest = new IndexRequest(index) .id(student.getId()) .source(JSON.toJSONString(student), XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("Saved data to ElasticSearch successfully: {}", response.getId());
        } catch (Exception e) {
            log.error("Failed to save data to elasticSearch: {}", e); @param id _id * @throws Exception */ public void getEs(String index, String index) String id) { GetRequest getRequest = new GetRequest(index, id); GetResponse response = null; try { response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> fields = response.getSource();for (Map.Entry<String, Object> entry : fields.entrySet()) {
                System.out.println(entry.getKey() + ":" + entry.getValue());
            }
        } catch (Exception e) {
            log.error("Failed to get data from elasticSearch: {}", e); } /** * update * @param student * @param index * @throws Exception */ public void updateEs(student student, String index) { UpdateRequest updateRequest = new UpdateRequest(index, student.getId()); updateRequest.doc(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = null; try { response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("Update data to ElasticSearch succeeded: {}", response.getId());
        } catch (Exception e) {
            log.error("Update data to elasticSearch failed: {}", e); }} /** * Delete data by id * @param index * @param ID _id * @throws Exception */ public void DeleteEs(String index, String id) { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = null; try { response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("Data removed from elasticSearch successfully: {}", response.getId());
        } catch (Exception e) {
            log.error("Failed to delete data from elasticSearch: {}", e); }}}Copy the code

BinLogElasticSearch.java

package com.example.canal.study.action; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.example.canal.study.common.CanalDataParser; import com.example.canal.study.common.ElasticUtils; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; ** @author haha */ @slf4j @component public class BinLogElasticSearch {
    @Autowired
    private CanalConnector canalSimpleConnector;
    @Autowired
    private ElasticUtils elasticUtils;
    //@Qualifier("canalHaConnector"Use a bean named canalHaConnector @autoWide@qualifier ("canalHaConnector") private CanalConnector canalHaConnector; public void binLogToElasticSearch() throws IOException { openCanalConnector(canalSimpleConnector); // Polling pull data Integer batchSize = 5 * 1024;while (true) {
//            Message message = canalHaConnector.getWithoutAck(batchSize);
            Message message = canalSimpleConnector.getWithoutAck(batchSize);
            long id = message.getId();
            int size = message.getEntries().size();
            log.info("Number of binLog messages currently monitored {}", size);
            if(id = = 1 | | size = = 0) {try {/ / wait 4 seconds Thread. Sleep (4000); } catch (InterruptedException e) { e.printStackTrace(); }}else{//1. Parse the message object List< canalentry. Entry> entries = message.getentries (); List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
                    if(tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple); / / 2. ElasticUtils. SaveEs (student,"student_index"); Canalsimpleconnector.ack (id); // 3. // canalHaConnector.ack(id); }if (tuple.eventType == CanalEntry.EventType.UPDATE) {
                        Student student = createStudent(tuple);
                        elasticUtils.updateEs(student, "student_index"); Canalsimpleconnector.ack (id); // 3. // canalHaConnector.ack(id); }if (tuple.eventType == CanalEntry.EventType.DELETE) {
                        elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString()); canalSimpleConnector.ack(id); // canalHaConnector.ack(id); }}}}} /** * encapsulates data into the Student object ** @param tuple * @return
     */
    private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) {
        Student student = new Student();
        student.setId(tuple.columnMap.get("id").toString());
        student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
        student.setName(tuple.columnMap.get("name").toString());
        student.setSex(tuple.columnMap.get("sex").toString());
        student.setCity(tuple.columnMap.get("city").toString());
        returnstudent; } /** * open canal connection ** @param canalConnector */ private void openCanalConnector(canalConnector) { // Connect to CanalServer canalconnector.connect (); // Subscribe destination canalconnector.subscribe (); } /** * close canal connection ** @param canalConnector */ private void closeCanalConnector(canalConnector) { / / close the connection CanalServer canalConnector. Disconnect (); / / cancel subscription destination canalConnector. Unsubscribe (); }}Copy the code

CanalDemoApplication. Java (spring boot startup class)

package com.example.canal.study;

import com.example.canal.study.action.BinLogElasticSearch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @author haha */ @springBootApplication public class CanalDemoApplication implements ApplicationRunner { @Autowired private BinLogElasticSearch binLogElasticSearch; public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args); @override public void run(ApplicationArguments args) throws Exception { binLogElasticSearch.binLogToElasticSearch(); }}Copy the code

application.properties

server.port=8081
spring.application.name = canal-demo

canal.server.ip = localhost
canal.server.port = 11111
canal.destination = example

zookeeper.server.ip = localhost:2181
zookeeper.sasl.client = false

elasticSearch.server.ip = localhost
elasticSearch.server.port = 9200
Copy the code

The Canal cluster was set up

Through the above learning, we know the application of canala in stand-alone direct connection mode. In today’s Internet era, single instance mode is gradually replaced by cluster high availability mode, so canal’s multi-instance cluster mode how to build!

Obtain the Canal instance based on ZooKeeper

Prepare the ZooKeeper Docker image and container

docker pull zookeeper

docker run -d--name zookeeper --net mynetwork -- IP 172.18.0.3 -p 2181:2181 ZooKeeper docker run-d--name canal-server2 --net mynetwork -- IP 172.18.0.8 -p 11113:11113 Canal /canal-serverCopy the code

The final result is shown as follows:

  1. The machine to prepare
    • The IP addresses of the canal containers are 172.18.0.4, 172.18.0.8
    • IP address of the ZooKeeper container: 172.18.0.3:2181
    • The IP address of mysql container is 172.18.0.6:3306
  2. Based on the deployment and configuration, complete the configuration on each machine. Instance name is Example in the demonstration
  3. Modify canal.properties, add zooKeeper configuration and modify the Canal port
    Canal. Port = 11113 canal. ZkServers = 172.18.0.3:2181 canal. The instance. The global. Spring. = XML classpath:spring/default-instance.xmlCopy the code
  4. Create the example directory and modify instance.properties
    canal.instance.mysql.slaveId = 1235 
    The previous canal slaveId was 1234Canal. The instance. The master. The address = 172.18.0.6:3306Copy the code

Note: The name of the instance directory on the two machines must be the same. HA mode is managed by instance name, and default-instance. XML must be selected for both machines

Start canal in two different containers. After the canal is started, you can run tail-100f logs/example/example.log to check the startup logs.

For example, I started 172.18.0.4 successfully

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running  
{"active":true."address":"172.18.0.4:11111"."cid": 1}Copy the code

Client links, consumes data

By specifying the ZooKeeper address and Canal instance name, the Canal client will automatically obtain the current working node from the running node in ZooKeeper and then establish a link with it:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true."address":"172.18.0.4:11111"."cid": 1}Copy the code

The corresponding client code can be in the following form. CanalHaConnector in canalconfig. Java above is an HA connection

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181"."example".""."");
Copy the code

After the connection is successful, the Canal Server records the current working Canal Client, such as the client IP, the linked port information, etc.

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true."address":"192.168.124.5:59887"."clientId": 1001}Copy the code

After a successful data consumption, Canal Server records the binlog point of the last successful consumption in ZooKeeper (the next time you restart the client, the consumption will continue from this last point).

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition"."identity": {"slaveId":-1,"sourceAddress": {"address":"mysql.mynetwork"."port": 3306}}."postion": {"included":false."journalName":"binlog.000004"."position": 2169,"timestamp": 1562672817000}}Copy the code

Stop the working 172.18.0.4 Canal Server

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh
Copy the code

172.18.0.8 will immediately start example instance to provide a new data service

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true."address":"172.18.0.8:11111"."cid": 1}Copy the code

At the same time, with the switch of Canal Server, the client will establish a link with the new Canal Server by obtaining the latest address in ZooKeeper and continue to consume data. The whole process is completed automatically

Exceptions and conclusions

Elasticsearch -head cannot access ElasticSearch

Es and ES-Head are two independent processes. When es-Head accesses the ES service, there is a cross-domain problem. Therefore, we need to modify the es configuration file and add some configuration items to solve this problem, as follows

[root@localhost /usr/local/elasticsearch-head-master]# cd .. / elasticsearch - 5.5.2 / config /
[root@localhost /usr/local/ elasticsearch - 5.5.2 / config]# vim elasticsearch.yml  
Add the following configuration to the end of the file
http.cors.enabled: true
http.cors.allow-origin: "*"
Copy the code

After modifying the configuration file, restart the ES service

406 Not Acceptable elasticSearch -head query

Solution: 1. Go to the head installation directory. 2,cd_site/ 3. Edit vendor.js in two placesLine # 6886 contentType: "application/x - WWW - form - urlencodedInto a contentType:"application/json; charset=UTF-8"
     Var inspectData = S.contentType == "Application /x-www-form-urlencoded" &&Var inspectData = s.tentType ==="application/json; charset=UTF-8" &&
Copy the code

Use elasticsearch – the rest – high – level – the client shall be reported to the org. Elasticsearch. Action. Index. IndexRequest. IfSeqNo

Add dependencies to #pom<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> The < version > 7.1.1 < / version > < / dependency ># Still need to join< the dependency > < groupId > org. Elasticsearch < / groupId > < artifactId > elasticsearch < / artifactId > < version > 7.1.1 < / version > </dependency>Copy the code

See Git Hub issues

Why does ElasticSearch not use Type in 7.x?

Why did ElasticSearch remove type from 7.x?

Using the spring – data – elasticsearch. Jar to org. Elasticsearch. Client. Transport. NoNodeAvailableException

The es TransportClient is used for elasticSearch, and the es TransportClient is not used for elasticSearch. The tool makes the call request with RestHighLevelClient, which is officially recommended by ES. See the RestHighLevelClient API

Set the Docker container to start

Docker update --restart=always [containerID] docker update --restart=always [containerID]Copy the code

Docker for Mac Network Host mode does not take effect

The host mode is for performance, but it breaks the isolation of docker and reduces security. In performance scenarios, you can use –netwokr host to enable the host mode, but note that if you start the container locally on Windows or Mac, the host mode will not work. The reason is that the host mode only supports Linux hosts.

See the official document: docs.docker.com/network/hos…

Client authenticate using SASL(unknown error)

  • Zookeeper. jar is inconsistent with the Zookeeper version in Dokcer
  • Zookeeper. jar uses versions prior to 3.4.6

This error means that ZooKeeper, as an external application, needs to apply for resources from the system. When applying for resources, it needs to pass authentication. Sasl is an authentication method, so we will try to bypass SASL authentication. Avoid waiting to be more efficient.

Add system.setProperty (“zookeeper.sasl.client”, “false”) to the project code; For spring Boot projects, add Zookeeper.sasl.client =false to application.properties

Reference: Increased CPU usage by unnecessary SASL checks

If the version of zookeeper.jar that depends on canal.client.jar is replaced

The canal’s official download source code to native git clone https://github.com/alibaba/canal.git, then modify the client module under pom. The XML file on the content of the zookeeper, then again MVN install

mvn install

Zookeeper returns the IP address in the Docker container, but the host IP address and the container IP address are on different network segments, so they cannot be pinged through

The hosts file can only be used for domain name to IP mapping (domain name redirection). Iptables can only be used for port redirection. However, this problem can be solved by IP to IP redirection. So we modified Canal’s official source code to do what we wanted. Modify ClusterCanalConnector. In Java connect () method.

The following is a comparison diagram of the modified content

On the selection of choice

This article sample project source ==> canal-ElasticSearch-sync