preface
ClickHouse is a ROLAP column database that helps you quickly get the “analytical” data you want in high-volume data analysis scenarios. This article mainly explains the overall process of a ClickHouse data query from a personal perspective, more of my own understanding and thinking, if there is any mistake, welcome to point out and communicate.
ClickHouse data organization
1.1 Meanings of ClickHouse cluster, Sharding, and copy
A ClickHouse cluster is formed by sharding. ClickHouse shards can be made up of one or more machines, and when multiple machines make up a shard, one node is the master copy node and the others are the data copy nodes, as shown in the figure above, where the number of copies is 1. A ClickHouse cluster can consist of multiple machines, or it can be divided into multiple clusters based on business characteristics, but each cluster has a small number of machines.
ClickHouse sharding you can think of as a single database instance of ClickHouse (replica nodes count as well), and multiple instances of such a single database constitute a ClickHouse cluster. Shards are servers that contain different parts of data (all shards must be accessed to read all data). ClickHouse uses sharding to split a table’s data horizontally on different nodes. As the business grows, ClickHouse can expand the table’s data horizontally to ensure data storage.
Replicas are servers that store replicated data (to read all data, access data on any replica). ClickHouse uses replica nodes for redundant data storage, trading space for data availability and the ability to select other replica nodes for data service when the primary replica is unavailable.
1.2 Data organization form of MergeTree table
MergeTree table engine data organization Form From the perspective of a single shard, the bottom layer is organized by the way of directory + file. First, ClickHouse will have a data root, assuming that the data root is:
/data/clickhouse/data
Copy the code
Now that the user has created a database named LAKE, create a directory named lake in the root directory:
/data/clickhouse/data/lake
Copy the code
When there are multiple databases, the underlying data directory structure is:
Clichouse root directory path/database nameCopy the code
Create a table named HELLO_LAKE (YYYYMMDD) and partition key (datafile, datafile, datafile, datafile, datafile, datafile, datafile, datafile).
/data/clickhouse/data/lake/hello_lake/20210323 _1_1_0
Copy the code
ClickHouse creates one or more partitioned directories underneath each batch of data that is inserted, depending on whether there is more than one partitioned data in the inserted data. If you insert a batch of data into the partition 20210323, then there will be an additional directory, 20210323_2_2_0, with the same partition but different numbers behind the partition. For details on how to parse the numbers behind the partition, see Section 6.2.2 ClickHouse Principles and Practices.
In the partitioned directory, which is the specific data to be stored, the partitioned directory has these files:
Idx is an index file that is sorted by the primary key specified when the table is created. [Column]. Bin is the compressed data for the Column, and [Column] is the name of the Column. There will be multiple files like this. [Column]. MRK indicates the mapping between the index number and the cheap quantity in the bin file.
ClickHouse data query process
2.1 Usage form of business party table
In a production environment, there are two types of tables that businesses typically use when using ClickHouse:
-
Local tables in the Replicated + MergeTree series are also defined as partitioning keys, which are typically Replicated by day.
-
The Distributed table is used for service query.
Data is written directly to the local table above each shard, which is used for real data storage. The tables of the Distributed table engine are used for data query. The Distributed table engine does not store data itself, but can perform Distributed query on multiple servers (shards). The Distributed table engine acts as a proxy for your actual table. When querying data, the Distributed table engine sends the query request to the shard, calculates the query in parallel with the index (if any), and finally consolidates the result and returns it to the Client.
2.2 Distributed Table Data Query process
The following is a query flow for the Distributed table engine type:
On the whole, the query process is divided into 6 steps:
- The Client sends a query statement to the ClickHouse agent, usually an Http request, and the query statement usually sets a partition range for the query.
- The ClickHouse agent selects a request node (a shard in the cluster) and forwards the query to the request node based on the load balancing policy.
- The requesting node translates the statement, for example if a Distributed table is used, into a local table, and then requests the query statement to all shards in the ClickHouse cluster for data query.
- After each shard executes the query, the result is returned to the requesting node.
- The request node merges the final result based on the user’s query logic and returns it to the ClickHouse agent.
- Finally, the ClickHouse agent returns the results to the client, and the business layer uses the data.
When a query statement is executed on a fragment, partition-level data filtering is performed first according to the partition range in the query statement. Idx file and the value range of the index key to query the range of the index number. Then query the offset relationship between the [Column]. Bin file and the [Column]. Finally, the data is loaded into memory for analysis and calculation.
Third, other
When querying a Distributed table engine type table, pay attention to the subquery posture, such as the following statement:
select A,B,C from Y where H = 100 and Z in (select Z from Y where D in ('a'.'b'))Copy the code
Y is the table for the Distributed table engine. Assuming the ClickHouse cluster has 10 machines, the above statement will end up with 100 queries (N squared, where N is the number of cluster fragments), which may make the query time longer. GLOBAL IN can be used to reduce the number of queries. Of course, the GLOBAL IN syntax will distribute network data, and the data will be stored IN the memory as a temporary table, so the data IN the sub-query should not be too large. If there is duplicate data IN the table data, You can also use Distinct to perform data deduplication.