Ali Mei High-performance Time Series Database (HiTSDB) is a high-performance, low-cost, stable and reliable online Time Series Database service that provides efficient read and write, High compression ratio storage, Time Series data interpolation and aggregate computing. Time line multidimensional analysis, mainly for monitoring systems and IoT domain. At present, it has been widely used in a number of internal businesses of Alibaba Group and provided stable services on Singles’ Day in 2016 and singles’ Day in 2017.

background

HiTSDB timing database engine has made a lot of targeted optimization according to the business characteristics of Alibaba Group when it serves customers in Alibaba Group. However, in the process of polishing HiTSDB cloud products, it is gradually found that many targeted optimizations are difficult to implement for specific users on the public cloud.

At the same time, in the process of using HiTSDB by public cloud customers, more and more problems caused by aggregation queries are found, such as stack overflow errors caused by too many returned data points, OOM caused by too many aggregation points, failure to complete aggregation, instance completely stuck and so on. These problems are largely due to flaws in the original aggregation engine architecture.

Therefore, the HiTSDB development team evaluated and decided to upgrade the HiTSDB engine around the new aggregation engine architecture, including: storage model transformation, indexing upgrade, new streaming aggregation, data migration, performance measurement. This paper mainly focuses on these five aspects, focusing on the “new streaming aggregation part”.

1. Time series data storage model:

1.1 Time series data storage format.

A typical temporal data is represented by two dimensions, one representing the time axis, and data is appended as time flows in. The other dimension is the timeline, which consists of metrics and data sources, which are unique data collection points identified by a series of labels. For example, CPU. Usage data comes from the collection points of computer room, application, and instance. In this way, we can logically abstract out an ID +{timestamp, value} timing data model. What is the storage of this data model. There are two typical data storage ideas:

  • A data block is divided according to the dimension of time window. The continuous data in the same natural time window is placed in adjacent positions, such as {1:00, 2:00}->(ID1, ID2, ID3… . , idns). Typical sequential databases using this approach include TSMT structured databases such as InfluxDB, Promethues, and so on. OpenTSDB is a bit special because OpenTSDB is a single-valued model and the metric dimension is mandatory when querying. Therefore, the first level can be divided according to the indicators, and then the second level can be divided according to the time window. In essence, it is the continuous data in the same time window. According to the way of time window segmentation, the advantage is that when writing, it can be very natural to fall according to the window. For the label query of high latitude, it is basically some continuous Scan. It is difficult to solve the “out of order” disorder problem in this mode. Promethues directly discard time points after the time window expires. In this case, the performance of InfluxDB is damaged.

  • (id1)->(1:00, 2:00, 3:00,… . , 23:00). HiTSDB is divided by time line. Currently, data on the fallen disk is stored in HBASE. The underlying Rowkey is a combination of indicators, labels, and natural Windows. The data points that Rowkey merges a timeline in order of size are contiguous. Therefore, for some low dimensional query efficiency is very efficient. Based on some of the iot services we are currently exposed to, there are more low-dimensional visits. Stream Scan is used for medium-dimensional queries. The query HiTSDB for very high latitude tags uses a pre-aggregated service (beyond the scope of this article).

1.2 Hot issues of time series model

In the production environment, the customer collects various indicators and the collection period varies. For example, the CPU. Usage index changes rapidly, the business side pays high attention to it, and the collection period is usually very short (1 second, 5 seconds, 10 seconds, etc.). However, disk.usage changes smoothly, and the collection period is usually 1, 5, and 10 minutes. In this case, if data is stored for the same indicator without special processing, hot issues may occur. If storage resources are fragmented by indicator type, imagine that there are 20 services, 10 clusters for each service, and 500 hosts in each cluster, and the collection period is 1 second, 100,000 CPU. Usage indicator data points fall into the same storage resource instance every second, while disk.usage indicator data collection period is 1 minute. Therefore, only about 1666 index data points fall onto another storage resource, and the data skew is very serious.

1.2.1 points barrels

The classic solution to this kind of problem is to divide buckets. For example, in addition to the indicator type, the service name and host name are used as dimension identification tags, and the CPU. Usage indicator is divided into different buckets. Write to different buckets according to the timeline hash value. OpenTSDB also uses the bucket mode to process hotspot problems, but requires broadcast reads. The fundamental reason is that the query mode requires global scanning in a certain time window. Therefore, setting the number of buckets for OpenTSDB requires a balance policy. If the number of buckets is too small, the hotspot is still localized. If the number is too large, the overhead of broadcast read during query will be very high.

In contrast, HiTSDB avoids broadcast reads and improves query efficiency. Before sending scan data to the underlying storage during query, HiTSDB will first obtain the exact hit timeline according to the query statement. With a specific timeline you can determine the location of the bucket, and then go to the corresponding block area to fetch data, there is no broadcast read. As for how HiTSDB gets the hit timeline when querying data, we believe that the reader’s question will be resolved after reading the inversion section.

1.2.2 Region Pre – the Split

When a table is newly created, HBase assigns a Region to the new table by default. All read and write requests access the same region of the same regionServer. In this case, other RegionServers in the cluster are idle. In this case, load balancing fails. To solve this problem, use the pre-split algorithm to generate multiple regions according to the number of buckets when creating a new table. byte[][] splitKeys =new byte[bucketNumber-1][]; splitKeys[bucketIndex-1] = (bucketIndex&0xFF);

2. Inverted index:

2.1 Multidimensional timelines in time series data

Multidimensional support is extremely important for any new generation of sequential databases. The types of temporal sequence data are diverse, and the sources are very complex. There are not only time-based ordered values in a single dimension, but also a large number of combinations related to multidimensional timelines. For a simple example, CPU load can have three dimensions to describe CPU core, host, and APP applications. Each dimension can have hundreds or even tens of thousands of tag values. Sys.cpu. load CPU =1 host=ipA app=hitsdb, the timeline can easily reach the million level after each dimension is combined. How to manage these timelines, build indexes and provide efficient queries are important problems to be solved in sequential database. At present, inversion index is the mainstream approach in the temporal sequence field.

2.2 Basic combination of inverted indexes

The basic combination idea of time line in inversion is as follows:

The original input value of the timeline:

id time series
1 sys.cpu.load cpu=1 host=ipA app=hitsdb
2 sys.cpu.load cpu=2 host=ipA app=hitsdb
3 sys.cpu.load cpu=3 host=ipA app=hitsdb
4 sys.cpu.load cpu=4 host=ipA app=hitsdb
5 sys.cpu.load cpu=1 host=ipB app=hitsdb
6 sys.cpu.load cpu=2 host=ipB app=hitsdb
7 sys.cpu.load cpu=3 host=ipB app=hitsdb
8 sys.cpu.load cpu=4 host=ipB app=hitsdb

After inverted construction:

term posting list
cpu=1 1, 5
cpu=2 2, 6
cpu=3 3, 7
cpu=4 4, 8
host=ipA 1, 2, 3, 4
host=ipB 5,6,7,8
app=histdb 1,2,3,4,5,6,7,8

CPU =3 and host=ipB:

term posting list
cpu=3 3, 7
host=ipB 5,6,7,8

The query result after intersection is 7:

id time series
7 sys.cpu.load cpu=3 host=ipB app=hitsdb

2.3 Problems and optimization ideas of inversion

Inversion is mainly faced with the problem of memory expansion:

  • Posting lists are too long. For high-latitude tags such as “machine room = Hangzhou”, hangzhou can have thousands or even tens of thousands of machines, which means that Posting lists need to store thousands of 64-bit ids. The solution to this problem is to compress the Posting list by sorting the ids in the array and then using delta encoding.

  • If Tag key-value pairs are used directly as terms, the memory footprint depends on the size of the string, and string dictionarization can also significantly reduce the memory overhead.

3. Streaming aggregation engine

3.1 Technical pain points of HiTSDB aggregation engine

The following problems were found in HiTSDB’s existing aggregation engine public cloud public test and collective internal service operation:

3.1.1 The Heap is prone to burst Due to the Materialization execution mode

The following figure shows the architecture diagram of the original query engine. The HiTSDB uses HBase as the storage. The original engine obtains time series data from HBase through the Async HBase client. HBase data reading is a time-consuming process. The API of the asynchronous HBase client is commonly used to improve system parallelism. However, the original aggregation engine adopts a typical materialization execution mode: 1) Multiple asynchronous HBase apis are started to start HBase reading; 2) Aggregation operation starts only after all timing sequence data involved in the query is read into the memory. The Materialized reaggregation of HBase Scan results in memory makes the HiTSDB prone to Heap explosion. In particular, if the user performs a query in a large time range, or the query time line data is very large, the HiTSDB will generate Heap OOM and the query will fail due to the amount of time sequence data involved.

3.1.2 Querying HBase Failure Symptom

There are two reasons why the underlying HBase is prone to blow up when the HiTSDB processes aggregate queries.

  • HBase may read redundant timeline data. The time line of HiTSDB is stored in HBase in the coding mode of indicator, time window, and label. A typical query is one in which the user specifies a metric, time range, and matching value on a spatial dimension for the label to look for. The label query conditions of the spatial dimension are not all prefixes in the label code. In this case, the inverted index of the HiTSDB cannot locate the specific HBase query condition based on the query condition of the space dimension. Instead, the data is read first and then filtered. This means that HBase may read a lot of redundant data, increasing HBase load.

  • The HiTSDB may send too many HBase read requests in a short time. On the one hand, HiTSDB adopts fragment storage mode in HBase and starts at least one read request for each fragment. On the other hand, due to the implementation mode of Materialization mentioned above, HBase read requests involved in a query are submitted asynchronously at the same time. A large number of read requests may be sent to HBase in a short period of time. In this case, a large query may blow up the HBase at the bottom.

When this happens, an even worse scenario is that HiTSDB cannot handle write requests for sequential data, resulting in subsequent loss of new data.

3.1.3 Execution architecture is highly coupled, making it difficult to modify or add functionality

Aggregation engines are mainly targeted at application scenarios for performance monitoring, and the query mode is fixed, so the engine architecture adopts a single mode, which is highly coupled with the logic of query, filtering, value filling/interpolation, and aggregation operations. This engine architecture doesn’t have much of a problem with monitoring fixed queries for applications, but HiTSDB aims to look beyond simple queries in monitoring scenarios to more complex queries in more application scenarios.

We found that with the architecture of the original engine, it was very difficult to add features on top of the original or modify the original implementation. The essence of the reason is that the original aggregation engine does not use the traditional database usually used by the execution framework, the execution layer can be customized by a number of operators, query semantics can be combined by different operators to complete. This problem was not particularly noticeable at the beginning of product development, but was a significant factor in the expansion of HiTSDB’s application scenarios and the addition of new features.

3.1.4 The efficiency of aggregation operation needs to be improved

The original engine iteratively performs the iterative algorithm, which is similar to the iterative execution mode used by traditional databases. The problem is that each time iteration is executed, it returns a point in time. Iterative executions return one point in time, or record, at a time. They are common in SCENARIOS such as OLTP, where the number of records accessed by a query is small. However, queries against HiTSDB may require access to a large amount of timeline data, which is not efficient.

The iterative function calls involved in iterative iterative iterative processes cannot be optimized in parallel using the SIMD supported by the new hardware, nor can the function code be optimized by inline and hotspot, which are commonly used by JVMS. In the case of large data volume, the popular general practice is to introduce Vectorization processing, that is, each iteration returns a batch of rows instead of a single record. For example, Google Spanner uses Batch-at-a-time instead of row-at-a-time, and Spark SQL also adopts Vectorization execution mode in its execution layer.

3.2 Design idea of streaming aggregation engine

In order to optimize HiTSDB and support commercial operation of HiTSDB, we decided to transform the HiTSDB aggregation operation engine in view of the problems in the original HiTSDB aggregation operation engine. The following figure shows the basic architecture of the new aggregated query engine.

3.2.1 Pipeline execution mode

Learn from the traditional database execution mode, the introduction of pipeline execution mode (AKA Volcano/Iterator execution mode). A query is parsed by the physical plan generator into a DAG or operator tree, which consists of different operators. The root operator on the DAG is responsible for driving the query execution. And returns the query result to the caller. At the execution level, a top-down demand-driven approach is adopted to drive the execution of operators from the root operator. Such an execution engine architecture has advantages:

  • This architectural approach has been adopted by many database systems and proved to be effective.

  • The interface is clearly defined, and different execution operators can be optimized independently without affecting other operators;

  • Easy to extend: It is easy to extend by adding new operators. For example, the current query protocol only defines the query conditions on the tag. If you want to support query conditions on index values (cpu.usage >= 70% and cpu.Usage <=90%), you can do so by adding a new FieldFilterOp.

Each operator implements the following interfaces:

  • Open: initializes and sets resources

  • Next: Call Next () of the input operator to obtain a batch of time series, process the input, and output the batch of time series

  • Close: Closes and releases resources

We implemented the following operators in HiTSDB:

  • ScanOp: used to read time line data from HBase asynchronously

  • DsAggOp: used for down-sampling calculation and value filling

  • AggOp: used for group aggregation operation, divided into PipeAggOp, MTAggOp

  • RateOp: Used to calculate the rate of change of time line values

3.2.2 Performing Calculation Operator A batch of time line data is the calculation unit

A batch of time line data is taken as the unit among computing operators to improve the performance of computing engine. The idea of Vectorization is borrowed from OLAP system. In this way, when the Operator processes a batch of multiple timelines and multiple time points on each time line, the cost of function call can be reduced and the execution efficiency of loop can be improved.

Each Operator obtains the time line batch from the input in streaming mode, and then outputs the time line batch after processing. Instead of storing the input time line batch, the requirement on memory is reduced. This is done only if the semantics of the Operator require that the input be materialize (see different implementations of the aggregate Operator mentioned below).

3.2.3. Distinguish different query scenarios and use different aggregation operators to optimize respectively

The original HiTSDB aggregation engine adopts the materialization execution mode, which is important for the interpolation operation of sequential data. This is mainly because a typical feature of sequential data is that timelines are not aligned: different timelines have data at different timestamps. HiTSDB is a protocol compatible with OpenTSDB and introduces the concept of interpolation. The purpose of the aggregation operation is to insert the calculated value into the misaligned timestamp through the specified interpolation method, so as to convert the misaligned timeline data into the aligned timeline. Interpolation is a comparison between all timelines of the same group to determine at which timestamp interpolation is required (see the OpenTSDB documentation).

In order to optimize the performance of aggregate queries, we introduce different aggregate operators. The goal is to optimize different queries for different semantics. Some aggregate queries require interpolation, while others do not; Even if interpolation is required, it can be performed by simply reading the timeline data from the same aggregation group into memory.

  • PipeAggOp: When the aggregated query meets the following criteria,

  • 1) No interpolation required: the query uses downsample, and the values of downsample are filled with non-NULL /NaN policies. In such queries, after downsampling, the time line data are aligned and complete, that is, the interpolation used by the aggregation function is no longer needed.

    2) Incremental iterative aggregation, such as sum, count, AVG, min, Max, Zerosum, Mimmim, mimmax, is supported. We can use incremental aggregation without having to read all the input data into memory. This operator adopts the pipelinized mode, obtains a series of timelines from the input operator each time, calculates the grouping and updates part of the values of the aggregation function, and cleans up the input timelines after completion, only retaining the values of the aggregation function of each grouping itself.

  • MTAgOp: Interpolation is required, and the input operator does not help pre-group the timeline ids, which falls back to the execution mode used by the original aggregation engine.

For MTAggOp, we can introduce the method of group aggregation for optimization:

  • GroupedAggOp: Interpolation is required, but the input operator ensures that the ID of the timeline has been sorted and grouped according to tags. Thus, in pipelined processing, as long as the data from the Materialize group is at most one, such an operator requires less memory than holding all the grouped timelines in memory and supports parallel aggregation between different groups.

3.2.4 Query optimizer and actuator

With the introduction of the operator and pipeline execution pattern, we can split the HiTSDB into two modules, the query optimizer and the executor. The optimizer generates different execution plans according to the different characteristics of query semantics and execution operators to optimize query processing. For example, HiTSDB can use the three aggregation operators discussed above to use different execution operators in different scenarios to reduce the memory overhead and improve the execution efficiency of the query. This processing mode is more optimized than the original single execution mode of the aggregation engine.

4. Migrate data

The new HiTSDB aggregation engine uses an underlying storage format that is not compatible with previous versions. Data running on the old version instances during public cloud beta needs to be migrated to the new aggregation engine. At the same time, when hot upgrade occurs, data migration should also be rolled back to convert the data points of the new version into the old data structure to achieve version rollback. The overall solution has the following impacts on users: No data is written, and historical data cannot be read during the upgrade.

4.1 Data migration Architecture

  • Concurrent transformation and migration of data: The existing HiTSDB data points have been sharded at write time. Defaults to 20 Salts. The data migration tool processes data points for each Salt concurrently. Each Salt has a Producer and a Consumer. Producer enables HBase Scanner to obtain data points. Each Scanner Asynchronously scans HBase and obtains data points of HBASE_MAX_SCAN_SIZE rows each time. Then convert the HBase Row Key to the new structure.

    Finally, the Row is placed on all queues for the Consumer to consume. The Consumer processes the amount of HBASE_PUT_BATCHSIZE or HBASE_PUT_MIN_DATAPOINTS each time. Every time a Consumer successfully writes to this Batch, we will record the data processing location corresponding to “Salt” in the UID table. In this way, in the event of a failure restart, the Producer can retrieve data points from the last successful point for conversion. The data migration tool implements asynchronous read and write operations on HBase. When scanning data or writing data fails, we make a limited attempt. If the number of attempts exceeds, we will terminate the data migration work of the “Salt”, and the work of other “Salt” will not be affected. The next time the tool automatically restarts, we will continue to migrate the “Salt” data in question until all data has been successfully converted.

  • Flow control restriction: In most cases, producers scan HBase data faster than consumers write HBase data. In order to prevent memory pressure caused by data backlog in Queue and reduce pressure on HBase when Producer scans data, we set flow control. When the Queue size reaches HBASE_MAX_REQUEST_QUEUE_SIZE, Producer stops scanning HBase data until consumers consume it. If the Queue size reaches HBASE_RESUME_SCANNING_REQUEST_QUEUE_SIZE, the Producer recovers.

  • Exit of the Producer and Consumer processes

    • How to exit when it’s done well: When everything is going well, the Producer puts an End of Scan (EOS) on the Queue after completing the data Scan and exits. When consumers encounter EOS, they will know that this Batch is the last Batch and will exit automatically after successfully processing this Batch.

    • If the Consumer encounters a problem: If the Consumer fails to write to HBase, the Consumer sets a Flag and exits the thread. This Flag is checked every time Producer is preparing for the next HBASE_MAX_SCAN_SIZE scan. If set, it knows that the corresponding Consumer thread has failed and exits. The Producer also stops scanning and exits. When the Producer encounters problems: When the Producer fails to scan data, the processing is similar to when the Producer completes the scan successfully. Notifications are completed by sending EOS to the Queue. On the next restart, the Producer rescan the data from the last recorded data processing position.

4.2 Data Migration Consistency

Because HiTSDB on the cloud has two nodes, the HiTSDB will be automatically restarted after the node upgrade is complete. The autostart script automatically runs the data migration tool. If no precautions are taken, data is migrated between the two HiTSDB nodes at the same time. Data is not lost or damaged, but heavy write and read pressure is imposed on HBase, which seriously affects write and query performance of users.

To prevent this from happening, HBase Zoo Keeper implements a FileLock lock, called DataLock, to ensure that only one node starts the data migration process. When the data migration process starts, it creates a temporary node in a Zoo Keeper specific path in the form of a non-blocking tryLock(). If the node is successfully created, the DataLock represents the result. If the node already exists, that is, was created by another HiTSDB, we receive a KeeperException. This means that the lock was not acquired and an immediate return failed. If the DataLock is not acquired successfully, the data migration process on that node will exit automatically. The nodes that produce DataLock begin data migration.

4.3 Perform Once in Data Migration

After all data points of “Salt” are successfully migrated, a new row of data, datA_conversion_completed, is inserted into the old HBase table. This row represents the successful completion of the data migration project. At the same time, the automatic script starts the data migration tool every 12 hours, in case the last data migration is not complete. At each startup, we first check the “datA_conversion_completed” flag. If the flag exists, the tool exits immediately. This operation performs HBase query only once and costs less than a normal health check. Therefore, periodically starting the data migration tool has no impact on HiTSDB or HBase.

4.4. Evaluation of data migration

Test model: 4core,8G,SSD

Acquisition write interval Amount of data points Storage line The test results
1 second 2.88 billion 800000 Migration TPS 20W, 10G storage/hour
10 seconds 360 million 1 million Migration TPS 19W, 9G storage/hour
1 hour 10 million 10 million Migrate 13W, 6G storage/hour

Result: 100+ instance data migration and hot upgrade are completed without failure.

5. Query performance evaluation

Test environment Configuration 192.168.12.3 2.1.5 version 192.168.12.4 2.2.0 (Pipelined Engine)

Test data – 10,000 times, different acquisition frequency and time window, as well as the number of query hit timelines.

Case 1: Data collection frequency 5s, 1000 hits are queried, time window 3600s

Test environment Version The test results
2.1.5 max rt = 628 ms, min rt = 180 ms. avg rt = 191
2.2.2 max rt = 136 ms, min rt = 10 ms. avg rt = 13

Case 2: The data collection frequency is 1s, one query is matched, and the time window is 36000s

Test environment Version The test results
2.1.5 max rt = 1803 ms, min rt = 1803 ms. avg rt = 1803
2.2.2 max rt = 182 ms, min rt = 182 ms. avg rt = 182

Bottom line: The new query aggregation engine increases query speed by more than 10 times.

other

This paper introduces the optimization and upgrading of the high-performance time series database HiTSDB engine before its commercial operation. The purpose is to improve the stability, data writing and query performance of HiTSDB engine and the expansibility of new functions. HiTSDB has been officially commercialized on Alicloud. We will further improve the HiTSDB engine based on user feedback to better serve HiTSDB customers.