instructions

There are two main purposes for building a cluster:

  • Efficiency, improved by parallelism and concurrency;

  • Stability:

    • Data backup Solves the problem that services are unavailable due to the failure of a single data node.

    • The high availability of the master node solves the single point problem.

First of all, according to the official documentation I have reviewed so far and most clickHouse cluster blogs on the Web, there is no HA solution for ClickHouse clusters, or there is HA (the cluster is still available when a node fails), but there is no automatic failover.

Clickhouse does not have HDFS with namespaces to automatically failover primary nodes based on failures, nor redis with additional sentinel services to failover primary nodes. The entire ClickHouse service is not available to the client (if it is connected to a specific service node) (although the service is still available if the client manually switches to the remaining ClickHouse nodes). Even the ClickHouse cluster does not have a master-slave concept, more on that later.

Clickhouse clustering relies on two table engines for implementation:

  1. ReplicatedMergeTree:

    1. In the websiteData ReplicationMergeTree Replicated correlational table engine to back up data:
      • ReplicatedMergeTree
      • ReplicatedSummingMergeTree
      • ReplicatedReplacingMergeTree
      • ReplicatedAggregatingMergeTree
      • ReplicatedCollapsingMergeTree
      • ReplicatedVersionedCollapsingMergeTree
      • ReplicatedGraphiteMergeTree
    2. These data backup is the effect of engine on specific tables, through the zookeeper store a copy of the information, each node is the entire table the full amount of the data backup, and because each node can separate foreign services, the client requests by nginx tools such as distributed to different nodes are indeed can improve the ability of foreign service;
  2. Distributed:

    1. The Distributed table engine is more like a view. It does not actually store data. The local table stores part of the Distributed table data (controlled by the number of shards and the weight of nodes).

      Each shard can have a weight defined in the config file. By default, the weight is equal to one. Data is distributed across shards in the amount proportional to the shard weight. For example, if there are two shards and the first has a weight of 9 while the second has a weight of 10, the first will be sent 9 / 19 parts of the rows, and the second will be sent 10 / 19.

    2. By default, data is asynchronously and non-blocking synchronized to the related backup nodes of the cluster, while the cluster nodes are completely configured through the configuration file.

    3. The load of cluster nodes can be controlled by configuring weights and algorithms. The default weight of each node is 1, and the default algorithm is Random.

    4. In addition to each node being a shard of the table, you can also configure backup for the shard.

    5. Clickhouse clustering is theoretically possible with Distributed + local tables (any table engine), but data synchronization with the Distributed table engine is somewhat problematic, so it is safer to use the Replicated related table engine as the local table engine.

Clickhouse single-node deployment

The cluster is set up by configuration, need to set up several nodes according to the single node mode to the corresponding deployment, and then do the configuration, single installation refer to my other article clickHouse single installation.

ReplicatedMergeTree

ReplicatedMergeTree is an example of Replicated table engines that can rely on ZooKeeper for data backup.

The ZooKeeper cluster is created

You can use an existing cluster to create ZooKeeper.

  1. Download ZooKeeper: Apache ZooKeeper™ Releases

  2. Upload the compressed package to the server and decompress it. The following uses Zookeeper-3.4.14 as an example:

    Tar -zxf ~/softs/zookeeper-3.4.14.tar.gz -c /opt/Copy the code
  3. Copy configuration files:

    CD /opt/zookeeper-3.4.14/conf cp zoo_sample.cfg zoo.cfgCopy the code
  4. Add the following contents to zoo. CFG:

    dataDir=/data/zookeeper
    # Maximum number of connections allowed per client
    #maxClientCnxns=60
    Create a myid file under the dataDir of each node and fill in the corresponding id: 1,2,3
    server.1=node1:2888:3888
    server.2=node2:2888:3888
    server.3=node3:2888:3888
    Copy the code
  5. Create a myID file in the dataDir directory and fill it with the serverID of node, such as node1:

    mkdir -p /data/zookeeper
    echo 1 > /data/zookeeper/myid
    Copy the code
  6. To configure the environment variable, add the following contents to /etc/profile:

    # ZOOKEEPER_HOME
    export ZOOKEEPER_HOME = / opt/zookeeper - 3.4.14
    export PATH=$PATH:$ZOOKEEPER_HOME/bin
    Copy the code
  7. Source /etc/profile makes the configuration take effect.

  8. Start and view services:

    #Execute on all three nodes
    zkServer.sh start
    zkServer.sh status
    
    #The following information is displayedZooKeeper JMX enabled by default Using config: /opt/zookeeper-3.4.14/bin/.. /conf/zoo.cfg Mode: followerCopy the code

Configure the ZK cluster in ClickHouse

The following configuration is displayed in /etc/clickhouse-server/config.xml:

<zookeeper incl="zookeeper-servers" optional="true" />
Copy the code

Incl indicates that information about a node with the name can be imported through an external file. Zookeeper can be configured either in /etc/clickhouse-server-config. XML or in an external configuration file. Import the external configuration file in /etc/clickhouse-server/config. XML. Note that the external configuration file in /etc/metrika.xml is imported by default if the external configuration file is not configured.

  1. Add the following configuration to /etc/clickhouse-server/config.xml:

    <include_from>/etc/clickhouse-server/metrika.xml</include_from>
    Copy the code
  2. /etc/clickhouse-server/metrika. XML. The ZooKeeper node is named the same as incl.

    <yandex>
    	<zookeeper-servers>
            <node index="1">
                <host>node2</host>
                <port>2181</port>
            </node>
            <node index="2">
                <host>node3</host>
                <port>2181</port>
            </node>
            <node index="3">
                <host>node4</host>
                <port>2181</port>
            </node>
        </zookeeper-servers>
    </yandex>
    Copy the code
  3. Distribute the configuration file to all nodes:

    scp ./config.xml ./metrika.xml root@node2:$PWD
    Copy the code
  4. Restart the ClickHouse server:

    service clickhouse-server restart
    Copy the code

ReplicatedMergeTree table test

  1. Create tables and import data on a node according to the official OnTime document:

    1. Download data:

      for s in `seq 1987 2018`
      do
      for m in `seq 1 12`
      do
      wget https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_${s}_${m}.zip
      done
      done
      Copy the code
    2. Create a table:

      CREATE TABLE `ontime` (
        `Year` UInt16,
        `Quarter` UInt8,
        `Month` UInt8,
        `DayofMonth` UInt8,
        `DayOfWeek` UInt8,
        `FlightDate` Date,
        `UniqueCarrier` FixedString(7),
        `AirlineID` Int32,
        `Carrier` FixedString(2),
        `TailNum` String,
        `FlightNum` String,
        `OriginAirportID` Int32,
        `OriginAirportSeqID` Int32,
        `OriginCityMarketID` Int32,
        `Origin` FixedString(5),
        `OriginCityName` String,
        `OriginState` FixedString(2),
        `OriginStateFips` String,
        `OriginStateName` String,
        `OriginWac` Int32,
        `DestAirportID` Int32,
        `DestAirportSeqID` Int32,
        `DestCityMarketID` Int32,
        `Dest` FixedString(5),
        `DestCityName` String,
        `DestState` FixedString(2),
        `DestStateFips` String,
        `DestStateName` String,
        `DestWac` Int32,
        `CRSDepTime` Int32,
        `DepTime` Int32,
        `DepDelay` Int32,
        `DepDelayMinutes` Int32,
        `DepDel15` Int32,
        `DepartureDelayGroups` String,
        `DepTimeBlk` String,
        `TaxiOut` Int32,
        `WheelsOff` Int32,
        `WheelsOn` Int32,
        `TaxiIn` Int32,
        `CRSArrTime` Int32,
        `ArrTime` Int32,
        `ArrDelay` Int32,
        `ArrDelayMinutes` Int32,
        `ArrDel15` Int32,
        `ArrivalDelayGroups` Int32,
        `ArrTimeBlk` String,
        `Cancelled` UInt8,
        `CancellationCode` FixedString(1),
        `Diverted` UInt8,
        `CRSElapsedTime` Int32,
        `ActualElapsedTime` Int32,
        `AirTime` Int32,
        `Flights` Int32,
        `Distance` Int32,
        `DistanceGroup` UInt8,
        `CarrierDelay` Int32,
        `WeatherDelay` Int32,
        `NASDelay` Int32,
        `SecurityDelay` Int32,
        `LateAircraftDelay` Int32,
        `FirstDepTime` String,
        `TotalAddGTime` String,
        `LongestAddGTime` String,
        `DivAirportLandings` String,
        `DivReachedDest` String,
        `DivActualElapsedTime` String,
        `DivArrDelay` String,
        `DivDistance` String,
        `Div1Airport` String,
        `Div1AirportID` Int32,
        `Div1AirportSeqID` Int32,
        `Div1WheelsOn` String,
        `Div1TotalGTime` String,
        `Div1LongestGTime` String,
        `Div1WheelsOff` String,
        `Div1TailNum` String,
        `Div2Airport` String,
        `Div2AirportID` Int32,
        `Div2AirportSeqID` Int32,
        `Div2WheelsOn` String,
        `Div2TotalGTime` String,
        `Div2LongestGTime` String,
        `Div2WheelsOff` String,
        `Div2TailNum` String,
        `Div3Airport` String,
        `Div3AirportID` Int32,
        `Div3AirportSeqID` Int32,
        `Div3WheelsOn` String,
        `Div3TotalGTime` String,
        `Div3LongestGTime` String,
        `Div3WheelsOff` String,
        `Div3TailNum` String,
        `Div4Airport` String,
        `Div4AirportID` Int32,
        `Div4AirportSeqID` Int32,
        `Div4WheelsOn` String,
        `Div4TotalGTime` String,
        `Div4LongestGTime` String,
        `Div4WheelsOff` String,
        `Div4TailNum` String,
        `Div5Airport` String,
        `Div5AirportID` Int32,
        `Div5AirportSeqID` Int32,
        `Div5WheelsOn` String,
        `Div5TotalGTime` String,
        `Div5LongestGTime` String,
        `Div5WheelsOff` String,
        `Div5TailNum` String
      ) ENGINE = MergeTree
      PARTITION BY Year
      ORDER BY (Carrier, FlightDate)
      SETTINGS index_granularity = 8192;
      Copy the code
    3. Import data:

      for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT  CSVWithNames"; doneCopy the code
    4. Check the data import data, because the download is slow, I only download one year’s data here:

      select count(1) from ontime;
      
      SELECT count(1)
      FROMOntime ┌ ─count(1) ─ ┐ │3900561│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.019 sec. 
      Copy the code
  2. Create table ReplacingMergeTree;

    1. Create onTIME_Zk table ontime_Zk table on two nodes

      CREATE TABLE ontime_zk (
        `Year` UInt16,
        `Quarter` UInt8,
        `Month` UInt8,
        `DayofMonth` UInt8,
        `DayOfWeek` UInt8,
        `FlightDate` Date,
        `UniqueCarrier` FixedString(7),
        `AirlineID` Int32,
        `Carrier` FixedString(2),
        `TailNum` String,
        `FlightNum` String,
        `OriginAirportID` Int32,
        `OriginAirportSeqID` Int32,
        `OriginCityMarketID` Int32,
        `Origin` FixedString(5),
        `OriginCityName` String,
        `OriginState` FixedString(2),
        `OriginStateFips` String,
        `OriginStateName` String,
        `OriginWac` Int32,
        `DestAirportID` Int32,
        `DestAirportSeqID` Int32,
        `DestCityMarketID` Int32,
        `Dest` FixedString(5),
        `DestCityName` String,
        `DestState` FixedString(2),
        `DestStateFips` String,
        `DestStateName` String,
        `DestWac` Int32,
        `CRSDepTime` Int32,
        `DepTime` Int32,
        `DepDelay` Int32,
        `DepDelayMinutes` Int32,
        `DepDel15` Int32,
        `DepartureDelayGroups` String,
        `DepTimeBlk` String,
        `TaxiOut` Int32,
        `WheelsOff` Int32,
        `WheelsOn` Int32,
        `TaxiIn` Int32,
        `CRSArrTime` Int32,
        `ArrTime` Int32,
        `ArrDelay` Int32,
        `ArrDelayMinutes` Int32,
        `ArrDel15` Int32,
        `ArrivalDelayGroups` Int32,
        `ArrTimeBlk` String,
        `Cancelled` UInt8,
        `CancellationCode` FixedString(1),
        `Diverted` UInt8,
        `CRSElapsedTime` Int32,
        `ActualElapsedTime` Int32,
        `AirTime` Int32,
        `Flights` Int32,
        `Distance` Int32,
        `DistanceGroup` UInt8,
        `CarrierDelay` Int32,
        `WeatherDelay` Int32,
        `NASDelay` Int32,
        `SecurityDelay` Int32,
        `LateAircraftDelay` Int32,
        `FirstDepTime` String,
        `TotalAddGTime` String,
        `LongestAddGTime` String,
        `DivAirportLandings` String,
        `DivReachedDest` String,
        `DivActualElapsedTime` String,
        `DivArrDelay` String,
        `DivDistance` String,
        `Div1Airport` String,
        `Div1AirportID` Int32,
        `Div1AirportSeqID` Int32,
        `Div1WheelsOn` String,
        `Div1TotalGTime` String,
        `Div1LongestGTime` String,
        `Div1WheelsOff` String,
        `Div1TailNum` String,
        `Div2Airport` String,
        `Div2AirportID` Int32,
        `Div2AirportSeqID` Int32,
        `Div2WheelsOn` String,
        `Div2TotalGTime` String,
        `Div2LongestGTime` String,
        `Div2WheelsOff` String,
        `Div2TailNum` String,
        `Div3Airport` String,
        `Div3AirportID` Int32,
        `Div3AirportSeqID` Int32,
        `Div3WheelsOn` String,
        `Div3TotalGTime` String,
        `Div3LongestGTime` String,
        `Div3WheelsOff` String,
        `Div3TailNum` String,
        `Div4Airport` String,
        `Div4AirportID` Int32,
        `Div4AirportSeqID` Int32,
        `Div4WheelsOn` String,
        `Div4TotalGTime` String,
        `Div4LongestGTime` String,
        `Div4WheelsOff` String,
        `Div4TailNum` String,
        `Div5Airport` String,
        `Div5AirportID` Int32,
        `Div5AirportSeqID` Int32,
        `Div5WheelsOn` String,
        `Div5TotalGTime` String,
        `Div5LongestGTime` String,
        `Div5WheelsOff` String,
        `Div5TailNum` String
      ) 
      ENGINE = ReplicatedMergeTree('/clickhouse/tables/ontime_zk'.'replica1', FlightDate, (Year, FlightDate), 8192);
      
      -- change replica1 to replica2 on node2
      Copy the code
    2. After the two nodes are created, you can view the information about them in ZooKeeper as follows:

      ls /clickhouse/tables/ontime_zk/replicas
      [replica1, replica2]
      Copy the code
    3. Insert data to ontime_zk on node1:

      Insert data on node1
      INSERT into ontime_zk select * from ontime limit 100000;
      
      -- Data is automatically synchronized from node1 to node2, and the results are consistent. The data synchronization is bidirectional, that is, data is inserted from Node2 to node1
      SELECT COUNT(1) from ontime_zk oz ;
      
      SELECT COUNT(1)
      FROM ontime_zk ASOz ┌ ─COUNT(1) ─ ┐ │100000│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.002 sec. 
      Copy the code

Distributed

The Distributed table engine determines the table fragment and copy information through configuration. Here, I use four nodes node1 to node4 to test the Distributed table with two fragments and two copies:

shard1: node1, node2
shard2: node3, node4
Copy the code

In /etc/clickhouse-server/config. XML there is cluster configuration information, which can be deleted:

<remote_servers incl="clickhouse_remote_servers" >.</remote_servers>
Copy the code

The same can be done in /etc/clickhouse-server/config. XML or in an external configuration file. Add the following to /etc/clickhouse-server/metrika.xml as an external configuration:

<yandex>
    <! -- Cluster Configuration -->
    <clickhouse_remote_servers>
        <! -- 2 fragment 2 backup -->
        <ck_cluster>
            <! -- Data fragment 1 -->
            <shard>
                <replica>
                    <host>node1</host>
                    <port>9000</port>
                    <! Authentication failed: password is incorrect or there is no user with such name -->
                    <user>default</user>
                    <password>default</password>
                </replica>
                <replica>
                    <host>node2</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default</password>
                </replica>
            </shard>
            <! -- Data fragment 2 -->
            <shard>
                <replica>
                    <host>node3</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default</password>
                </replica>
                <replica>
                    <host>node4</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default</password>
                </replica>
            </shard>
        </ck_cluster>
    </clickhouse_remote_servers>
</yandex>
Copy the code

Metrika.xml is then distributed to all nodes and cluster information can be queried in the database:

select * from system.clusters;

SELECT *
FROMSystem. Clusters ┌ ─ cluster ─ ─ ─ ─ ┬ ─ shard_num ─ ┬ ─ shard_weight ─ ┬ ─ replica_num ─ ┬ ─ host_name ─ ┬ ─ host_address ─ ┬ ─ port ─ ┬ ─ is_local ─ ┬ ─user─ ─ ─ ─ ┬ ─ default_database ─ ┬ ─ errors_count ─ ┬ ─ estimated_recovery_time ─ ┐ │ ck_cluster │111 │ node1     │ 192.168. 085.90001default │                  │            00 │
│ ck_cluster │         112│ 2 │192.168. 086.90000default │                  │            00 │
│ ck_cluster │         211 │ node3     │ 192.168. 087.90000default │                  │            00 │
│ ck_cluster │         212 │ node4     │ 192.168. 088.90000default │                  │            00│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘4 rows in set. Elapsed: 0.003 sec. 
Copy the code

Let’s start testing the distributed table:

  1. Create local tables on all 4 nodes with table engine MergeTree:

    CREATE TABLE ontime_local (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = MergeTree
    PARTITION BY Year
    ORDER BY (Carrier, FlightDate)
    SETTINGS index_granularity = 8192;
    Copy the code
  2. Then create a distributed table on one of the nodes:

    The ontime_cluster table will be created ON the node where the ontime_cluster table is executed
    CREATE TABLE ontime_cluster ON CLUSTER ck_cluster
    (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    )
    ENGINE = Distributed(ck_cluster, test, ontime_local, rand());
    Test is the name of the database, ontime_local is the name of the local table, and rand() is the data distribution method, or sharding key
    /** There is an explanation for the rand() parameter: The sharding expression can be any expression from constants and table columns that returns an integer. For example, you can use the expression rand() for random distribution of data, Or UserID for distribution by the remainder from dividing the user's ID */
    Copy the code
  3. Write data to ontime_cluster:

    Start by looking at how much data there is in OnTime
    select count(1) from ontime;
    
    SELECT count(1)
    FROMOntime ┌ ─count(1) ─ ┐ │3900561│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.001 sec. 
    
    Write data to ontime_cluster
    INSERT into ontime_cluster select * from ontime ;
    
    -- Check how much data ontime_cluster has, and ontime
    select count(1) from ontime_cluster;
    
    SELECT count(1)
    FROMOntime_cluster ┌ ─count(1) ─ ┐ │3900561│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.010 sec. 
    
    -- Check how much data ontime_local has. There are two shards, and the data is about half
    select count(1) from ontime_local;
    
    SELECT count(1)
    FROMOntime_local ┌ ─count(1) ─ ┐ │1950884│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.004 sec. 
    Copy the code
  4. When a distributed table is queried, the query is automatically distributed to each shard, and the same shard searches for one copy. That is, as long as one copy is available for all shards, the whole cluster is available. However, the reason why I say it does not have automatic failover is that it does not provide a unified external connection portal. When the database is connected, it is connected to a specific node such as Node1. If node1 fails, the cluster is still available, but it is not available to the client. It should be possible to use nginx to broker connections at production time.

  5. There is no master-slave concept in the cluster. Each node can provide the complete function of adding, deleting, and modifying data. Here we test inserting data into node2:

    #With the same table structure, I import the downloaded data directly into ontime_cluster
    for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host node2 --password default --query="INSERT INTO test.ontime_cluster FORMAT CSVWithNames"; done
    Copy the code
    Then go to node1 and query ontime_cluster doubled
    select count(1) from ontime_cluster;
    
    SELECT count(1)
    FROMOntime_cluster ┌ ─count(1) ─ ┐ │7801122│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.023 sec. 
    
    select count(1) from ontime_local;
    
    There are almost twice as many here
    SELECT count(1)
    FROMOntime_local ┌ ─count(1) ─ ┐ │3901207│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.004 sec. 
    Copy the code
  6. truncate table ontime_cluster; It doesn’t clear the table because distributed tables don’t really store data. Currently I see that the only way to delete data is through the local table. I wonder if there is a better way.

HA solution

MergeTree + Distributed

In this scenario, the local surface engine is MergeTree (no problem with any table engine, mainly to reflect the difference between Distributed data synchronization and relevant table engine in Replicated), and then create a Distributed table. This solution makes use of the data synchronization function of the cluster (the details have not yet been detailed), but there will be some problems according to the website:

  1. If Node1 is temporarily down, incremental data can be completed from the start of the outage to recovery, depending on the push mechanism on Node2, there will be temporary directories.
  2. However, if node1 is completely lost and the hard disk fails and cannot be recovered, it has to be redone to replace node1 with node5.

Writing data to a cluster:

First, you can define which servers to write which data to and perform the write directly on each shard. In other words, perform INSERT in the tables that the distributed table “looks at”. This is the most flexible solution as you can use any sharding scheme, which could be non-trivial due to the requirements of the subject area. This is also the most optimal solution since data can be written to different shards completely independently.

Second, you can perform INSERT in a Distributed table. In this case, the table will distribute the inserted data across the servers itself. In order to write to a Distributed table, it must have a sharding key set (the last parameter). In addition, if there is only one shard, the write operation works without specifying the sharding key, since it doesn’t mean anything in this case.

In short, there are two ways:

  1. Different services write data to the local table of different shards.
  2. When data is written to a distributed table, the data is distributed to different partitions according to the distribution method used to build the table.

First, recall the configuration information about the cluster. The cluster has four nodes, two fragments and two copies:

shard1: node1, node2
shard2: node3, node4
Copy the code

My testing process is as follows:

  1. SQL > select * from node1; SQL > select * from node1;

    SELECT COUNT(1) FROM ontime_cluster oc ;
    
    SELECT COUNT(1)
    FROM ontime_cluster ASOc ┌ ─COUNT(1) ─ ┐ │7801232│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.010 sec. 
    
    SELECT COUNT(1) FROM ontime_local ol ; 
    
    SELECT COUNT(1)
    FROM ontime_local ASOl ┌ ─COUNT(1) ─ ┐ │3901256│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.006 sec.
    Copy the code
  2. Insert 200 records into node1 local table:

    INSERT into table ontime_local select * from ontime  limit 200;
    Copy the code
  3. SQL > insert data into node1; SQL > insert data into node1;

    SELECT COUNT(1) FROM ontime_cluster oc ;
    
    SELECT COUNT(1)
    FROM ontime_cluster ASOc ┌ ─COUNT(1) ─ ┐ │7801432│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.015 sec. 
    
    SELECT COUNT(1) FROM ontime_local ol ; 
    
    SELECT COUNT(1)
    FROM ontime_local ASOl ┌ ─COUNT(1) ─ ┐ │3901456│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.019 sec. 
    
    Copy the code
  4. Node2 is the same shard as node1, and node1 is the same shard as node1, and node1 is the same shard as node1.

    SELECT COUNT(1) FROM ontime_cluster oc ;
    
    SELECT COUNT(1)
    FROM ontime_cluster ASOc ┌ ─COUNT(1) ─ ┐ │7801232│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.013 sec. 
    
    SELECT COUNT(1) FROM ontime_local ol ;
    
    SELECT COUNT(1)
    FROM ontime_local ASOl ┌ ─COUNT(1) ─ ┐ │3901256│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.005 sec. 
    Copy the code
  5. SQL > select * from node3 where node3 and node1 are sharded; SQL > select * from node3 where node3 and node1 are sharded; SQL > select * from node3 where node3 is sharded;

    SELECT COUNT(1) FROM ontime_cluster oc ;
    
    SELECT COUNT(1)
    FROM ontime_cluster ASOc ┌ ─COUNT(1) ─ ┐ │7801432│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.010 sec. 
    
    SELECT COUNT(1) FROM ontime_cluster oc ;
    
    SELECT COUNT(1)
    FROM ontime_cluster ASOc ┌ ─COUNT(1) ─ ┐ │7801232│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.005 sec. 
    Copy the code
  6. The problem appears to be that data written directly to the local table is not synchronized to the backup node, but the exact cause and solution will take time to investigate. However, writing directly to a distributed table does not present this problem.

ReplicatedMergeTree+ Distributed

In this scheme, the local table engine is ReplicatedMergeTree, and then a Distributed table is created, which is to change the local table from MergeTree to ReplicatedMergeTree, and the data synchronization is changed from database synchronization to ZooKeeper synchronization.

  1. Delete local table and distribution table:

    -- Execute for each node
    DROP table ontime_local;
    -- One of the nodes will execute
    DROP table ontime_cluster on cluster ck_cluster;
    Copy the code
  2. Add the following configuration to /etc/clickhouse-server/metrika. XML:

    <! -- node1 -->
    <macros>
            <shard>01</shard>
            <replica>01</replica>
    </macros>
    
    <! -- node2 -->
    <macros>
            <shard>01</shard>
            <replica>02</replica>
    </macros>
    
    <! -- node3 -->
    <macros>
            <shard>02</shard>
            <replica>01</replica>
    </macros>
    
    <! -- node4 -->
    <macros>
            <shard>02</shard>
            <replica>02</replica>
    </macros>
    Copy the code
  3. Each node creates a local table for ReplicatedMergeTree:

    CREATE TABLE ontime_zk_local (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) 
    ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/ontime_zk_local', '{replica}', FlightDate, (Year, FlightDate), 8192);
    Copy the code
  4. Create a distributed table that asks the local table ontime_zk_local:

    CREATE TABLE ontime_cluster ON CLUSTER ck_cluster
    (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    )
    ENGINE = Distributed(ck_cluster, test, ontime_zk_local, rand());
    Copy the code
  5. Write data to the cluster, then view the distributed table and local table data:

    INSERT into ontime_cluster select * from ontime ;
    
    SELECT COUNT(1) FROM ontime_cluster;
    
    SELECT COUNT(1)
    FROMOntime_cluster ┌ ─COUNT(1) ─ ┐ │3900561│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.013 sec. 
    
    -- You can see that the amount of data on the local surface is about half
    SELECT COUNT(1) FROM ontime_zk_local ozl ; 
    
    SELECT COUNT(1)
    FROM ontime_zk_local ASEquity raisings ┌ ─COUNT(1) ─ ┐ │1951166│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.006 sec. 
    Copy the code
  6. Insert 200 records into local table ontime_zk_local on node1

    INSERT into ontime_zk_local select * from ontime limit 200;
    
    -- Distributed table data volume +200
    SELECT COUNT(1) FROM ontime_cluster;
    
    SELECT COUNT(1)
    FROMOntime_cluster ┌ ─COUNT(1) ─ ┐ │3900761│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.013 sec. 
    
    -- Local surface data volume +200
    SELECT COUNT(1) FROM ontime_zk_local ozl ;
    
    SELECT COUNT(1)
    FROM ontime_zk_local ASEquity raisings ┌ ─COUNT(1) ─ ┐ │1951366│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.005 sec.
    Copy the code
  7. Node2: Local table data is synchronized to node2: local table data is synchronized to node2:

    -- Distributed table data volume +200
    node2 :) SELECT COUNT(1) FROM ontime_cluster;
    
    SELECT COUNT(1)
    FROMOntime_cluster ┌ ─COUNT(1) ─ ┐ │3900761│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.013 sec. 
    
    -- Local surface data volume +200
    node2 :) SELECT COUNT(1) FROM ontime_zk_local ozl ;
    
    SELECT COUNT(1)
    FROM ontime_zk_local ASEquity raisings ┌ ─COUNT(1) ─ ┐ │1951366│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.005 sec. 
    
    Copy the code
  8. SQL > select * from node3; SQL > select * from node3;

    -- Distributed table data volume +200
    node3 :) SELECT COUNT(1) FROM ontime_cluster;
    
    SELECT COUNT(1)
    FROMOntime_cluster ┌ ─COUNT(1) ─ ┐ │3900761│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘1 rows in set. Elapsed: 0.006 sec. 
    Copy the code
  9. In summary, ReplicatedMergeTree+ Distributed HA scheme is more reliable.

Reference documentation

  1. Distributed Table Engine
  2. Data Replication
  3. ReplacingMergeTree
  4. Clickhouse clusters applications, sharding, and replication
  5. ClickHouse clusters are built from 0 to 1