【 Background 】
A certain mobile office in China uses Impala component to process telecommunication business CVD, and about 100TB CVD is processed every day. The CVD table records more than 10 billion CVD levels every day. There are the following problems in using Impala:
- Detailed lists are stored in Parquet format, data tables are partitioned by time +MSISDN number, and Impala is used for query. If the query scenario is not partitioned, the query performance is poor.
- There are many performance issues with Impala (catalog metadata bloat causes slow metadata synchronization, etc.) and poor concurrent query performance.
- Impala belongs to MPP architecture, which can only achieve the level of 100 nodes. Generally, when the number of concurrent queries reaches about 20, the throughput of the whole system has reached the full load state, and the throughput cannot be improved even when nodes are added.
- Resources cannot be scheduled using YARN. Therefore, dynamic resource sharing of Impala, Spark, and Hive cannot be implemented in a Hadoop cluster. Opening CDR query capabilities to third parties also fails to achieve resource isolation.
[Solution]
In view of the above series of problems, the customers of the mobile office site required us to provide corresponding solutions. Our big data team analyzed the above problems and made technology selection. During this process, we took several typical business scenarios of the mobile office site as input. Prototype verification and performance tuning for Spark+CarbonData, Impala2.6, HAWQ, Greenplum and SybaseIQ, optimized data loading performance and query performance of CarbonData for our business scenarios and contributed to CarbonData open source community. Finally, we chose Spark+CarbonData, which is also a typical SQL On Hadoop solution and indirectly confirms the trend of traditional data warehouse migrating to SQL On Hadoop. Refer to the community’s official website for our validation tests and understandings: CarbonData is a high performance data storage solution for big data Hadoop ecosystem, especially in the case of large data volumes. CarbonData is deeply integrated with Spark and compatible with all Spark ecosystem functions (SQL,ML, DataFrame, etc.). Spark+CarbonData is suitable for a single piece of data to meet the needs of multiple business scenarios. It contains the following capabilities:
- Storage: row, column file storage, column storage is similar to Parquet, ORC, row storage is similar to Avro. Supports multiple index structures for data such as CDRS, logs, and streams.
- Computing: Deep integration and optimization with Spark computing engine; Supports interconnection with Presto, Flink, and Hive engines.
- Interface:
- API: Compatible with DataFrame, MLlib, Pyspark and other native API interfaces;
- SQL: Compatible with Spark syntax basics, while supporting CarbonSQL syntax extensions (update and delete, indexes, pre-aggregated tables, etc.).
- Data management:
- Support incremental data warehousing, data batch management (aging management)
- Support data update and deletion
- Supports interconnection with Kafka and quasi-real-time storage
Detailed introduces the key technologies and the use, please read on the website https://carbondata.apache.org/ view the document
【 Technical selection introduction 】
Here is a supplementary introduction to why SQL on Hadoop technology was chosen as the final solution.
Those who are in contact with big data know that big data has a 5V feature. From traditional Internet data to mobile Internet data, and then to the current hot IoT, in fact, with every progress in the industry, the amount of data will increase by two or three orders of magnitude. Moreover, the current data growth shows a trend of accelerating growth. Therefore, five characteristics of Internet big data including mobile Internet and Internet of Things are proposed: Volume, Velocity, Variety, Value and Veracity. With the increase of data volume, traditional data warehouse faces more and more challenges.
Challenges faced by traditional data warehouses:
At the same time, data systems are constantly evolving
• Evolution of storage: Offline, nearline -> all online
• Evolution of storage architecture: centralized storage -> distributed storage
• Evolution of storage models: Fixed architecture -> flexible architecture.
Evolution of data processing patterns
• Fixed model fixed algorithm -> Flexible model flexible algorithm
Evolution of data processing types
• Structured centralized single-source computing -> Multi-structured distributed multi-source computing
Evolution of data processing architectures
• Database static processing -> data real-time/streaming/mass processing
Kimball, the father of the database of changes, has a point:
Kimball’s core point:
Hadoop changes the data processing mechanism of traditional data warehouse. A processing unit of traditional database is decoupled into three layers in Hadoop:
• Storage layer: HDFS
• Metadata layer: Hcatalog
• Query layer: Hive, Impala, and Spark SQL
Schema on Read gives users more choices:
• Data is imported into the storage tier in its original format
• Target data structures are managed through the metadata layer
• Does the query layer decide when to extract data
• After long-term exploration and familiarity with data, users can adopt the Schema on Write Schema to solidify intermediate tables and improve query performance
The serial number |
Data processing mode based on RDBMS |
Hadoop-based data processing mode |
1 |
Strong consistency |
Final consistency, processing efficiency over data accuracy |
2 |
The data must be transformed otherwise the process cannot continue |
Data can be stored in its original format for a long time without conversion |
3 |
Data must be cleaned and formalized |
Data is not recommended for cleansing and canonization |
4 |
Data is basically stored in physical tables, and file access is inefficient |
Most of the data is stored in files, and physical tables are equivalent to structured files |
5 |
Metadata is limited to dictionary tables |
The metadata is extended to the HCatalog service |
6 |
There is only one data processing engine, SQL |
Open data processing engine: SQL, NOSQL, Java API |
7 |
The data processing process is completely controlled by THE IT staff |
Data engineers, data scientists, data analysts can participate in data processing |
SQL on Hadoop data warehouse technology
Data processing and analysis
• SQL on hadoop
• Kudu+Impala, Spark, HAWQ, Presto, Hive, etc
• Data modeling and storage
• Schema on the Read
• Avro & ORC & Parquet & CarbonData
Stream processing,
• Flume, Kafka and Spark Streaming
The development and maturity of SQL-on-Hadoop technology drives change
After the above technical analysis, we finally choose SQL on Hadoop technology as the future data warehouse evolution direction of our platform. There must be some people here asking why not choose MPPDB technology. Here we also compare SQL on Hadoop with MPPDB:
Compare the item |
SQL on Hadoop |
MPPDB |
Fault tolerance |
Supports fine-grained fault tolerance. Fine-grained fault tolerance means that a task will be automatically retried without resubmitting the entire query |
Coarse-grained fault tolerance cannot handle Straggler nodes. Coarse-grained fault tolerance means that if a task fails, the entire query fails and the system resubmits the entire query to obtain the result |
scalability |
The number of cluster nodes can scale to hundreds or even thousands |
It is difficult to scale beyond 100 nodes, usually around 50 nodes (for example, we used Greenplum to verify performance degradation over 32 machines earlier) |
concurrency |
As the cluster size increases the available resources, the number of concurrent tasks increases nearly linearly |
MPPDB maximizes resources for query to improve query performance, so the number of concurrent queries supported is low. Generally, when the number of concurrent queries reaches about 20, the whole system has reached the full load state |
The query delay |
1. The data scale is less than 1PB, and the record level of a single table is 1 billion. The delay of a single query is usually about 10s 2. If the data scale is larger than 1PB, add cluster resources to ensure query performance |
1. The data scale is less than 1PB, and the single table has 1 billion record levels. The MPP delay of a single query can usually return query results within seconds or even milliseconds 2. If the data scale is larger than 1PB, the query performance may decline sharply due to architecture limitations |
Data sharing |
Storage is separated from computation, and a common storage format can support different data analysis engines, including data mining |
The unique MPPDB database storage format cannot be directly used by other data analysis engines |
[Effect of program implementation]
Spark+CarbonData was launched in the office at the end of September 2018, and it has been running since replacing Impala, processing more than 100TB of documents every day. In peak business hours, data loading performance has increased from 60MB/s per Impala to 100MB/s per platform. In typical business scenarios of the office, Query performance At 20 concurrent queries, Spark+CarbonData’s query performance was more than twice that of Impala+ Parquet.
At the same time, the following problems are solved:
- Hadoop cluster resource sharing problem. Impala resources cannot use Yarn unified resource scheduling management. Spark+CarbonData can use Yarn unified resource scheduling management to share dynamic resources with other components, such as Spark and Hive.
- Hadoop cluster expansion issues. Impala used to be able to use only a hundred machines, but Now Spark+CarbonData is able to do a thousand node cluster size.
Notes in the implementation process:
- CarbonData’s local sort is used to load data. To avoid the problem of too many small files in a large cluster, only a few machines are assigned to load data, and a table-level compaction can be assigned to merge small files generated during a small table load.
- According to the query characteristics of services, set the frequently queried filtered fields to the sort column attribute of the data table (such as the user number frequently queried by telecom services), and set the fields of sort column to be sorted in descending order according to the query frequency of the fields. If the query frequency is not different, The distinct field values are sorted in descending order to improve query performance.
- The blocksize of a single table data file can be defined using TABLEPROPERTIES. The unit of blocksize is MB. The default value is 1024MB. Based on the actual data volume of the data table loaded each time and our practical experience, it is recommended that the blocksize of the table with a small data volume be set to 256MB and that of the table with a large data volume be set to 512MB.
- In addition, datamaps such as BloomFilter can be created to improve query performance based on the characteristics of service queries.
- Some Spark related parameter Settings for the data load and query, first SparkUI analysis performance bottlenecks, in targeted adjustment of related parameters, here not introduce one by one, remember the performance tuning is a technical accumulation, parameter adjustment to targeted, a readjustment only related to one or several parameters, in effect, Do not take effect to adjust back, remember not to adjust too many parameters at one time.