This article was originally published in: Walker AI

Read this article to learn about ClickHouse and how to build a ClickHouse distributed cluster deployment solution. If you are interested in ClickHouse or have your own ideas about data analysis, please check us out

1. What is ClickHouse?

ClickHouse is an open source column-storage-based database for real-time data analysis by Yandex (Russia’s largest search engine) that processes data 100-1000 times faster than traditional methods. ClickHouse outperforms comparable column-oriented DBMSS on the market today, processing hundreds of millions to billions of rows and tens of gigabytes of data per second per server.

Some of ClickHouse’s features

  • Fast: ClickHouse takes full advantage of all available hardware to process each query as quickly as possible. Peak processing performance for a single query exceeds 2 TB per second (after decompression, use only columns). In a distributed setup, reads are automatically balanced between healthy copies to avoid added latency.
  • Fault tolerance: ClickHouse supports asynchronous replication across multiple hosts and can be deployed across multiple data centers. All nodes are equal, which avoids a single point of failure. The downtime of a single node or the entire data center does not affect the system’s read and write availability.
  • Scalable: ClickHouse scales well in both vertical and horizontal directions. ClickHouse is easily tuned to execute on clusters with hundreds or thousands of nodes, on a single server, or even on a small virtual machine. Currently, the amount of data installed on each single node exceeds trillions of lines or hundreds of terabytes.
  • Easy to use: ClickHouse is easy to use and out of the box. It simplifies all data processing: all structured data is absorbed into the system and immediately available to build reports. SQL allows the expression of desired results without involving any custom non-standard apis found in some DBMSS.
  • Take advantage of the hardware: ClickHouse handles typical analysis queries two to three orders of magnitude faster than a traditional line-oriented system with the same available I/O throughput and CPU capacity. The columnar storage format allows for more hot data to be held in RAM, which reduces response time.
  • Improved CPU efficiency: Vectorized query execution involves associated SIMD processor instructions and runtime code generation. Processing data in columns increases the CPU row cache hit ratio.
  • Optimized disk access: ClickHouse minimizes the number of range queries, thereby increasing the efficiency of using a rotating disk drive because it maintains continuous storage of data. Minimize data transfer: ClickHouse enables companies to manage their data without having to use a dedicated network for HIGH-PERFORMANCE computing

2. Set up a cluster

2.1 Preparing machines

Environment: centos 6 or centos 7 (ZooKeeper also requires high availability)

The host ip Install the soft note
bgdata.operate.ck-0001 172.20.1.39 ClickHouse – server ClickHouse – client jdk1.8 ZooKeeper
bgdata.operate.ck-0002 172.20.1.246 ClickHouse – server ClickHouse – client jdk1.8 ZooKeeper
bgdata.operate.ck-0003 172.20.1.173 Jdk1.8 ZooKeeper Due to limited machine resources, only a ZooKeeper high availability machine can be used as a CK node in practice

When ck synchronizes table data, it will query the hosts file in ZooKeeper. ZooKeeper stores the hosts domain name, not the IP address. If the domain name is not resolved, the table can be synchronized but table data cannot be synchronized

172.20.1.246    bgdata.operate.ck-0002
172.20.1.39     bgdata.operate.ck-0001
172.20.1.173    bgdata.operate.ck-0003
Copy the code

2.2 Setting up three ZooKeeper Clusters

Yum install -y java-1.8.0-openjdk-devel mkdir /data CD /data wget https://downloads.apache.org/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2.tar.gz && tar xf Gz ln -s apache-zookeeper-3.6.2.tar.gz zookeeper CD zookeeper # Create the configuration file cp conf/zoo_sample. CFG Conf /zoo. CFG # create ZooKeeper data directory mkdir dataCopy the code

Modify the configuration file conf/zoo. CFG

TickTime =2000 initLimit=10 syncLimit=5 dataDir=/data/ ZooKeeper /data clientPort=2181 server.1=172.20.1.39:2888:3888 Server. 2 = 172.20.1.246:2888-3888 for server 3 = 172.20.1.173:2888-3888Copy the code

Create myID in ZooKeeper data directory, three different servers, corresponding to the above number server.1

Echo 1 > / data/zookeeper/data / # myid other machines is 2, 3, respectivelyCopy the code

Start and check

./bin/zkServer.sh start
./bin/zkServer.sh status
Copy the code

2.3 Build a shard and copy ClickHouse cluster

Bgdata.operate. cK-0001 bgdata.operate.ck-0002 The command is executed on the host

Install clickhose

yum install yum-utils rpm --import https://repo.yandex.ru/ClickHouse/ClickHouse-KEY.GPG yum-config-manager --add-repo https://repo.yandex.ru/ClickHouse/rpm/stable/x86_64 yum install ClickHouse-server ClickHouse-client mkdir /data/ClickHouse chown -r ClickHouse.ClickHouse /data/ClickHouse/Copy the code

Vim /etc/clickhouse/config.xml

Modify listen_host

<! -- Listen specified host. use :: (wildcard IPv6 address), if you want to accept connections both with IPv4 and IPv6 from everywhere. --> <! -- <listen_host>::</listen_host> --> <! -- Same for hosts with disabled ipv6: --> <! - < listen_host > 0.0.0.0 < / listen_host > -- > <! -- Default values - try listen localhost on ipv4 and ipv6: --> <! - < listen_host > : : 1 < / listen_host > < listen_host > 127.0.0.1 < / listen_host > -- > < listen_host > 0.0.0.0 < / listen_host > <! -- Add all addresses accessible --> <! -- Don't exit if ipv6 or ipv4 unavailable, but listen_host with this protocol specified --> <! -- <listen_try>0</listen_try> --> <! -- Allow listen on same address:port --> <! -- <listen_reuse_port>0</listen_reuse_port> --> <! -- <listen_backlog>64</listen_backlog> -->Copy the code

Modifying a Storage Path

<! -- Path to data directory, with trailing slash. --> <path>/data/ClickHouse/</path> <! -- Change the storage path --> <! -- Path to temporary data for processing hard queries. --> <tmp_path>/data/ClickHouse/tmp/</tmp_path>Copy the code

Adding a Cluster configuration

<remote_servers> <bigdata> <! -- cluster name, custom --> <shard> <! Define a shard --> <! -- Optional. Shard weight when writing data. Default: 1. --> <weight>1</weight> <! -- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). --> <internal_replication>false</internal_replication> <replica> <! -- Which machine does the shded copy exist on --> <host>172.20.1.39</host> <port>9000</port> </replica> <replica> <host>172.20.1.246</host> <port>9000</port> </replica> </shard> <! -- <shard> <weight>2</weight> <internal_replication>true</internal_replication> <replica> <host>172.20.1.39</host> <port>9000</port> </replica> <host>172.20.1.246</host> <port>9000</port> </replica> </shard> --> </ Bigdata > </remote_servers>Copy the code

Add zooKeeper configuration

<! -- ZooKeeper is used to store metadata about replicas, when using Replicated tables. Optional. If you don't use replicated tables, you could omit that. See https://clickhouse.yandex/docs/en/table_engines/replication/ --> <zookeeper Incl ="zookeeper- Servers "optional="true" /> <zookeeper> <node index="1"> <host>172.20.1.39</host> <port>2181</port> < node > < node index = "2" > < host > 172.20.1.246 < / host > < port > 2181 < / port > < node > < node index = "3" > < host > 172.20.1.173 < / host > <port>2182</port> </node> </zookeeper>Copy the code

Configure the shard Macros variable

<macros incl="macros" optional="true" /> <! > <macros> <shard>1</shard> < Replica >172.20.1.39</replica> <! </macros>Copy the code

Start the

systemctl start ClickHouse-server.service
systemctl enable ClickHouse-server.service
Copy the code

detection

[root@bgdata zookeeper]# clickhouse-client -h 172.20.1.246 -m ClickHouse Client version 20.3.2.1 (Official build). Connecting to 172.20.1.246:9000 as user default. Connected to ClickHouse server version 20.3.2 Revision 54433. bgdata.operate.ck-0002 :) select * from system.clusters ; SELECT * FROM system.clusters ┌ ─ cluster ─ ┬ ─ shard_num ─ ┬ ─ shard_weight ─ ┬ ─ replica_num ─ ┬ ─ host_name ─ ─ ─ ─ ┬ ─ host_address ─ ┬ ─ port ─ ┬ ─ is_local ─ ┬ ─ user ─ ─ ─ ─ ┬ ─ default_d Atabase ─┬─errors_count─┬─estimated_recovery_time─ ─ bigdata │ 1 │ 1 │ 1 │ 1 │ 172.20.1.39 │ 172.20.1.39 │ 9000 │ 0 │ Default │ 0 │ 0 │ Bigdata │ 1 │ 1 │ 2 │ 172.20.1.246 │ 172.20.1.246 │ 9000 │ 1 │ default │ 0 │ 0 │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ 2 rows in the set. The Elapsed: 0.001 SEC.Copy the code

test

Create database test1;

A table to build data

CREATE TABLE t1 ON CLUSTER bigdata ( `ts` DateTime, `uid` String, `biz` String ) ENGINE = ReplicatedMergeTree('/ClickHouse/test1/tables/{shard}/t1', '{replica}') PARTITION BY toYYYYMMDD(TS) ORDER BY TS SETTINGS index_Granularity = 8192 ###### ENGINE = ReplicatedMergeTree, ENGINE = ReplicatedMergeTree, Cannot be before MergeTree # # # # # # '/ ClickHouse/test1 / tables / {shard} / t1' is written to the zk inside address, the only, INSERT INTO T1 VALUES ('2019-06-07 20:01:01', 'a', 'show'); INSERT INTO t1 VALUES ('2019-06-07 20:01:02', 'b', 'show'); INSERT INTO t1 VALUES ('2019-06-07 20:01:03', 'a', 'click'); INSERT INTO t1 VALUES ('2019-06-08 20:01:04', 'c', 'show'); INSERT INTO t1 VALUES ('2019-06-08 20:01:05', 'c', 'click');Copy the code

The second machine views the data. If the data is found and consistent, it succeeds. Otherwise, the configuration needs to be checked again

3. Summary

  • The replica set is for the table, not the library and not the whole CK, so you can use ReplicatedMergeTree for some of the tables, or you can just not copy them, so the database needs to be created
  • And ES sharding and copy machine distribution is different, CK each machine can only one sharding copy, so if you want to build 2 sharding 2 copies need 2*2 machine, otherwise error
  • When testing the read and write data, it was found that the newly created table would be synchronized, but the data was not synchronized. By checking CK log and the corresponding host in ZK, it was found that zK stored the host name instead of the IP, so it could not find the host to write to and needed to change the hosts file
  • Testing the Python ClickHouse_driver connection cluster found that a higher version of ClickHouse_driver is required, or there is no alt_hosts parameter
  • Add and delete databases manually. Add and delete tables with ON CLUSTER Bigdata. Data is added and deleted asynchronously in real time

Python connects to the CK cluster example

from ClickHouse_driver import Client
client = Client("172.20.1.39",database="test1",alt_hosts="172.20.1.246") # here alt_hosts stands for other copy machines to see in the shit-source code
print(client.execute("show tables"))
Copy the code

PS: more dry technology, pay attention to the public, | xingzhe_ai 】, and walker to discuss together!