Introduction of the author: Hu Mengyu, development engineer of Zhihu core architecture platform, majoring in big data infrastructure, is mainly responsible for the secondary development of zhihu’s internal big data components and data platform construction.
preface
A year ago, The big data architecture of Zhihu met TiDB for the first time. At that time, we migrated the meta database of Hive MetaStore to TiDB, and got a performance improvement of one magnitude over the stand-alone database. After seeing the power of the distributed NewSQL database TiDB, we placed high hopes on it and applied it to other scenarios of big data architecture, such as: Hive large query alarm, NameNode RPC acceleration.
Hive query alarms are reported
background
In Zhihu, Hive is mainly applied in two scenarios: 1. ETL core link task 2. Ad-hoc query. In the ETL scenario, Hive SQL tasks are fixed and stable, but in the Adhoc scenario, Hive SQL submitted by users is random and variable. Under the condition of the user to optimize SQL is not done, start the graphs of the task will scan data too much, not only makes the task to run slower, will also cause great pressure to the HDFS, influence the stability of the cluster, this kind of circumstance appears very frequently, by the end of the end of the quarter or some users can scan in the first quarter and year data, Once such queries occur, cluster resources will be strained, which in turn affects ETL tasks and results in delayed report output.
SQL query real-time alarm system introduction
In view of the above pain points, we developed a real-time alarm system for big SQL query, which will do the following things when users submit SQL:
-
Parse the SQL execution plan into the table path and partition path to be scanned;
-
Summarize the size of all partition paths and calculate the total amount of scanned data.
-
Check whether the total number of scanned partitions exceeds the threshold. If so, notify users on the enterprise wechat.
The detailed implementation of each step is described below.
Obtain the HDFS scanned by Hive from the execution plan
In this step, we use Hive Server Hook mechanism to output an audit log to Kafka after each SQL is parsed. The audit log format is as follows:
{" operation ", "QUERY", "user" : "HDFS", "time", "the 2021-07-12 15:43:16. 022", "IP" : "127.0.0.1", "hiveServerIp" : "127.0.0.1", "inputPartitionSize" : 2, "SQL" : "select count(*) from test_table where pdate in ('2021-07-01','2021-07-02')", "hookType": "PRE_EXEC_HOOK", "currentDatabase": "default", "sessionId": "5e18ff6e-421d-4868-a522-fc3d342c3551", "queryId": "hive_20210712154316_fb366800-2cc9-4ba3-83a7-815c97431063", "inputTableList": [ "test_table" ], "outputTableList": [], "inputPaths": [ "/user/hdfs/tables/default.db/test_table/2021-07-01", "/user/hdfs/tables/default.db/test_table/2021-07-02" ], "app.owner": "humengyu" }Copy the code
Here we focus on the following fields:
field | meaning |
---|---|
operation | SQL types, such as QUERY, DROP, and so on |
user | Users who submit SQL are group accounts in Zhihu |
sql | The SQL content submitted |
inputPaths | Scanned HDFS path |
app.owner | Submit the personal account of SQL |
Summarize the size of partitions
To summarize the partition size, you need to know the directory size of each HDFS path in the inputPaths field. There are several solutions:
plan | advantages | disadvantages |
---|---|---|
Call HDFS API to obtain data in real time | Results are accurate | The getContentSummary method needs to be called, which consumes NameNode performance and takes a long time to wait. |
Use partition statistics from Hive MetaStore | fast | The results may be inaccurate. Some tables are directly written into the HDFS directory through other computing engines, such as Flink and Spark, and the statistical information is not updated in time. |
Use HDFS fsimage to parse out all Hive directory sizes and store them in TiDB | fast | The result has a T+1 delay, and the partition size cannot be counted on that day. |
Considering the usage scenario, most large SQL queries scan data for several months or even years, and it is acceptable to ignore partition information for one or two days, we choose the third solution: resolve fsimage of HDFS every day, calculate the size of each Hive directory, and store the results in TiDB. Since we use fsimage information in other scenarios, we store not only Hive directories, but the entire HDFS directory, which contains nearly ten billion pieces of data. Obviously, TiDB is a good choice for such a large amount of data, as well as data indexing.
Real-time alarm
We send audit logs to Kafka in real time, then use Flink to consume audit logs in Kafka in real time, using KafkaTableSource and Json Format to use Kafka as flow table. Using JdbcLookupTableSource and TiDB as dimension table, it can easily calculate the data amount of each SQL scan and then judge the alarm.
The final results are as follows:
The NameNode PRC acceleration
background
If the getInputSummary method is called, there is a global lock. If the getInputSummary method is called, there is a global lock. If the getInputSummary method is called, there is a global lock. Calling this method takes a long time, causing other query threads to wait for the lock to be released. After reading the source code, it is found that the getInputSummary method can be executed concurrently. It is actually calling the getContentSummary method of the HDFS client. We remove the lock and no longer use the function of global lock, but adopt a similar way to thread pool. Allow it to be executed with a high degree of concurrency. However, this may cause some problems. The getContentSummary method on the HDFS client is similar to the DU operation on the file system. If the concurrency is too high, NameNode performance will be significantly affected. Not only Hive, but other computing engines also call the getContentSummary method, so optimizing this method is necessary.
Caches ContentSummary information
In 2019, HDFS has split Federation, adopting the Router Base Federation scheme, and introducing the Router, the proxy component of NameNode. We only need to cache the HDFS ContentSummary in the Router layer. When the client initiates a call, if the cache hits, it reads from the cache; if the cache misses, it requests from the NameNode. After internal discussion, there are the following cache schemes:
plan | advantages | disadvantages |
---|---|---|
When the client requests the Router for the first time, it returns from the NameNode and updates the cache. On the second request, the cache is first retrieved, and the modification time of the directory is determined. If no changes have been made, the cache is returned. If any changes have been made, the cache is updated by returning from NameNode. | For infrequently modified directories, you only need to request the NameNode once. | You still need to access the NameNode for the first request; Only directories that do not have subdirectories can be cached because changes to subdirectories are not perceived by upper directories. |
Use fsimage to generate a full list of ContentSummary information cached to TiDB every day, and follow the logic of the first solution when requested by the client. | Most directories do not go through the NameNode on their first request. | You can still cache only directories that do not have subdirectories, because subdirectory changes are not sensed by the upper directory. |
We chose the second option because the ContentSummary information was already generated when we did Hive SQL query alerts, so it was very easy to access. After TiDB is connected to the cache and the request path is indexed, the latency for getContentSummary requests in normal cases can be less than 10ms, while for NameNode without TiDB cache, this time may take several minutes or even dozens of minutes.
Looking forward to
This time, we cache HDFS meta information by using TiDB’s super-large storage and index functions, which satisfies some internal scenarios of Zhihu. We will continue to improve and expand this scenario in the future: For example, HDFS file information can be cached in real time. Edit log can be used to subscribe file changes, and then it can be merged with the stock fsimage in TiDB to produce low-latency NameNode snapshots for some online analysis.