preface

Apache Hive after years of development, has basically become the industry to build large-scale data warehouse fact standard and data processing tools, Hive is not only a technical component, but a design concept. Hive has a JDBC client, HiveServer2 server that supports standard JDBC interface access, Hive Metastore that manages metadata services, and MapReduce distributed jobs that run on YARN.

Hive’s STANDARD JDBC interface, standard SQL server, distributed task execution, and metadata center combine all the features needed to build an enterprise-level data warehouse, and Hive’s SQL server is the most widely used standard server.

Although Hive has very obvious advantages, there are very few components that can completely replace Hive, but it does not mean that Hive is a component that fully meets enterprise business requirements at this stage. Most of the time, Hive is not chosen because Hive supports enterprise needs well. Simply because there is no alternative service to support the enterprise’s demands.

Enterprise series warehouse construction requirements

The data warehouse architecture is usually the starting point for enterprise data analysis, and there is a layer of data lake below the data warehouse for heterogeneous data storage and cold data backup. However, there are many enterprises, especially those which are mainly based on structured data, will combine data lake and enterprise data warehouse in implementation, and combine them based on some data warehouse platform.

When considering the construction of their own data warehouse system, although enterprises need to refer to the existing industry technology system, as well as the choice of component services, but can not be too limited to the component itself, looking for 100% out-of-the-box products. Too limited to finding perfectly compatible component services is bound to be limited by the implementation of the services themselves, leaving huge constraints for future extensions. Enterprise data warehouse architecture is not equal to a component, most companies in several warehouse architecture is based on the existing part of the implementation plan, based on their business part of the development and customization of the right direction so as to achieve one and a half since the research of the steady state, can not only keep up with the speed of business change, and not too dependent on and restricted to the development of these components.

Generally speaking, the design and selection of enterprise series warehouse architecture should be considered from the following dimensions:

  • Ease of development: Does the chosen database architecture have a good development ecosystem that can provide different types of development interfaces beyond SQL editors, code submission, and third-party tool integration?
  • Ecology: Does the chosen implementation engine have a good ecosystem function on its own, or does it integrate well with other services, such as data lake engine Delta Lake, Icebeg, Hudi, etc., but the pace of Hive integration is slow.
  • Degree of decoupling: Distributed tasks require coordination of multiple components, such as distributed storage, resource management, and scheduling. Hive relies heavily on YARN, and its computing engine is strongly bound to MR, which is weak in decoupling. If an enterprise wants to build its own computing engine on K8S, Hive faces more obvious limitations.
  • Performance: Whether the overall architecture has better performance.
  • Security: Whether different levels of user access and data security authentication systems are supported.

The most important aspect of enterprise warehouse architecture is how to design the architecture based on enterprise business processes, rather than extending the architecture based on one component.

The overall logic of an enterprise data warehouse is shown in the figure above. During the construction of data warehouse, ETL processing and layered design are usually required. Various ETL processing is conducted based on structured and unstructured data collected by the business system to form DWD layer. There will be DWB/DWS as part of the intermediate process data.

In terms of technology selection, from the ETL of data sources to the construction of data model usually requires a long time task, that is, the running time of the whole task is usually hours or more. However, the DM layer mainly supports business requirements and has high requirements on timeliness. Usually, the time of tasks running on the DM layer is expressed in minutes.

Based on the architecture diagram of the layered design above, it can be found that although there are many components, such as Presto, Doris, ClickHouse, Hive, etc., these components work in different scenarios, such as warehouse construction and interactive analysis are two typical scenarios.

Interactive analysis emphasizes timeliness. A query can produce results quickly. Although Presto, Doris and ClickHouse can also process massive data, even reaching PB or above, they are mainly used in interactive analysis, that is, DM layer based on data warehouse. It provides interactive analysis and query based on services for users to quickly explore. Because such engines focus more on interactive analysis, they are not friendly to support long-term tasks. In order to achieve fast computing results, such engines rely heavily on memory resources and need to configure high hardware resources for such services. Such components usually have the following constraints:

  • There is no task-level retry, and a failed Query can only be rerun, which is costly.
  • In general, full memory computing cannot perform massive data if shuffle or shuffle does not drop disks.
  • In order to speed up query, a node whose task execution has been scheduled before task execution fails cannot be rescheduled.

If a task fails due to network jitter or a node is lost due to machine downtime, it takes almost as long to try again as to submit a task again. In the context of distributed tasks, the longer the task runs, the higher the probability of errors. For the use of such components, industry best practice also recommends that no more than 30 minutes or so of queries using such engines is appropriate.

However, in the offline data warehouse scenario, almost all tasks are long-term tasks, that is, the running time of tasks is above the hour level. At this time, the component services executing ETL and constructing data warehouse model are required to have high fault tolerance and stability, and can be quickly recovered in a low-cost way when errors occur in the task. Avoid complete task failure due to abnormal status of some nodes.

Computing engines such as Hive and Spark rely on Yarn to manage resources and ensure retry, scheduling, and switching of distributed tasks. Components such as Hive and Spark use the recalculated data drop-disk mechanism to quickly recover nodes that fail or some tasks fail. Data is stored in distributed storage systems such as the HADOOP Distributed File System (HDFS). Data is not managed by itself, providing high stability and fault tolerance.

Hive and Spark, on the other hand, are better at processing batch tasks with long duration. Therefore, these components are not good at interactive analysis with the upper layer and cannot meet the requirements of high timeliness. Therefore, Hive and Spark are usually used to build data warehouses, while Presto, Doris and ClickHouse are often used to provide interactive analysis queries.

Summarized as follows:

  • Presto, Doris, and ClickHouse: pay more attention to interactive analysis, have high requirements on stand-alone resource configuration, rely heavily on memory, lack fault tolerant recovery, task retry and other mechanisms, and are suitable for tasks within 30 minutes. They usually work in the DM layer of the enterprise and directly deal with business requirements.
  • Hive, Spark: Pay more attention to the stability of a task, to the network, I/o request is higher, has a perfect the temporary file falls in the middle of the plate, node failure retry, more appropriate hours above the long task of running, to work in the enterprise of ETL and data model building layer, is responsible for the cleaning and processing data, the upper business need to support the overall number of enterprise warehouse building.

When an enterprise implements data platform, a number of different components work in different architectural layers, which cannot be replaced by each other and cooperate with each other to carry the data platform business of the entire enterprise.

Enterprise series warehouse technology selection

Three papers published by Google described a new distributed data processing technology from storage, computing and retrieval of massive data. These three directions were realized by Yahoo Nutch team and contributed to Apache, which is currently known as HDFS, MapReduce and HBase. Formed the three early strengths of Hadoop.

However the three weapons are more focused on information extraction of heterogeneous data processing, does not provide friendly to structured data similar to SQL syntax analysis of the entrance, at the same time in the programming mode of support is not friendly, only two phase Map and Reduce, severely limits the implementation of business process, yahoo’s team is also related business hatch out, It can be seen that the three early Hadoop suites have the following characteristics:

  • The threshold is high and requires programming, and the programming mode is limited by the two-stage constraints of MapReduce.
  • It mainly deals with discrete data, and lacks support for analysis ability, query and other common data analysis functions.
  • Interactive exploration is impossible without an interactive client.

Hive was born in such a large industry context. Hive just makes up for the defect that Hadoop can only be used for offline data processing, provides a common analysis interface, and provides a very good user interaction.

Hive provides the JDBC interface to support programming interaction. In addition, almost all SQL clients, open source and commercial BI tools in the industry connect to Hive using standard JDBC to support data exploration. Greatly enrich the diversity of components under the big data ecosystem, but also reduce the threshold of use, so that people familiar with SQL can low-cost migration.

Based on these well-designed effects, and the fact that Hive has evolved over the years to become a very stable and mature data warehouse component for production environments, even alternatives are hard to find, it is a good choice to use Hive as the foundation for building data warehouses.

As the picture above shows, there are many advantages:

  • Stability: Hive stability is a very commendable feature, many times although Hive performance, computing speed is not as fast as other engines, but Hive stability has been very good.
  • Low barriers: With basic SQL skills, Hive can be developed at a lower cost than other distributed computing engines.
  • Rich ecosystem: Hive is closely integrated with the Hadoop ecosystem, and Metastore of Hive itself has become the standard metadata service in the big data ecosystem. Most engines support direct adaptation of Metastore.
  • Easy expansion: The Hive UDF mechanism can quickly expand functions based on service requirements.
  • Security: Hive supports Kerberos and LDAP authentication modes. In addition, Hive can combine with Ranger to achieve more fine-grained row and column permissions, ensuring data security.
  • Low integration cost: MapReduce only supports programming interfaces and does not support iterative computing. Hive encapsulates SQL interfaces provided by MapReduce and can be integrated with upper-layer data mining and data analysis tools at a low cost.

So even though Hive has been around for a long time, it’s still the number one choice for warehouse building, and Hive is ubiquitous throughout warehouse building. While Hive has many advantages that make it difficult to part with, it does not necessarily support enterprise business needs. Many times Hive is chosen simply because there are no other components available, and if you develop one from scratch or adapt it based on a component, the cost will be far higher than the enterprise expected, so you have to continue using Hive.

Based on practice, the main limitations of Hive in the process of building enterprise warehouse revolve around the following aspects:

  • Performance: Although Hive based on MapReduce brings excellent stability, it also reduces its performance. Although TEZ has been optimized to some extent, it still lags far behind Spark, a similar computing engine.
  • Resource allocation: Hive uses MapReduce as its computing engine, which is not friendly to SQL. Therefore, Hive converts SQL into Physical plans based on MapReduce at HiveServer2. As a result, HiveServer2 requires a very high configuration to maintain good enough stability.
  • Concurrency: Hive concurrency is limited by HiveServer2. Enterprises need to maintain multiple highly configured Instances of HiveServer2 to support better performance. The bottleneck of Hive is usually HiveServer2 rather than lower-level distributed computing.
  • Fault tolerance cost: Hive implements SQL analysis based on HiveServer2. Multiple Hiveserver2s are independent of each other and do not share information. Therefore, if HiveServer2 fails, all HiveServer2 tasks end and clients need to retry. Restart for fault tolerance at the entire job level.
  • Transaction support: Hive transactions are set on HiveServer2. Once HiveServer2 instances start transactions, all requests that pass through HiveServer2 will start transactions, resulting in high transaction costs.
  • Deployment: If an enterprise’s computing engine deployment is based on a container architecture such as K8S, Hive on K8S can be very costly to deploy.

Hive has made many attempts On the above aspects (Hive On Spark). However, due to the Hive architecture, HiveServer2 has its own SQL parsing engine. To be compatible with the Hive architecture, HiveServer2 directly translates the parsed results into Spark’s low-level interface. Overall performance has not improved much.

There are a lot of other great components out there besides Hive. However, from the perspective of enterprise data warehouse technology selection, only Hive and Spark SQL are more suitable for building data warehouses. Spark SQL has more obvious advantages over Hive.

How does SparkSQL support enterprise series warehouses

The Spark engine is widely used in data processing scenarios because of its powerful ecology and convenient programming interfaces. The Spark SQL module provides a good infrastructure for using Spark to support enterprise data warehouses.

As shown in the figure above, a typical data warehouse architecture involves different levels of model building. Due to the large amount of data, heterogeneous data structure and other reasons, the construction of enterprise data warehouse under big data architecture abandons Cube design based on relational database, and directly adopts distributed task-based processing to construct multi-layer data model. Therefore, for the construction of enterprise data warehouse services, there are the following requirements:

  • Support for long duration tasks, usually more than hours, mostly days.
  • Support for multitasking, that is, high concurrency.
  • Stability must be guaranteed.
  • Speed is fast.
  • Supports SQL interactive interfaces.
  • Easy integration.
  • Support for task rerun and fault tolerance and fast task failure recovery.

Based on the above features, Spark SQL is better suited to this task than other components (and even Hive) in the range of components available today. However, when designing Spark SQL architecture, many enterprises are concerned about the rich features brought by Spark SQL and the lack of SQL servers such as Hive. Therefore, Spark SQL and Spark SQL coexist. Hive degenerates to provide only MetaStore service. Therefore, in many practices, Hive building enterprise data warehouse is a thing of the past, and Spark SQL is a great choice for building data warehouse.

As shown in the figure above, enterprises can use Spark SQL Server to provide resident service based on SQL interface when constructing data warehouse. At the same time, enterprises can use Spark Submit to directly Submit Jar tasks to run, which can provide standard SQL interactive interface. And can provide more flexible programming interface.

From different perspectives of enterprise warehouse construction, Hive has brought increasing constraints, and Spark SQL is ready to replace Hive to build the entire warehouse. Spark SQL has the following advantages:

  • Rich ecosystem: Not only can Spark integrate with many components, but its own ecosystem already covers everything from data analytics to machine learning and graph computing.
  • Open: Spark’s architecture is very open and allows for rapid integration with other products. For example, it is much more open to integrate features such as Iceberg and Hudi than Hive.
  • Deployment: Spark can be deployed on ECS VMS or in the K8S architecture.
  • Performance: Spark’s mechanism for streaming batch processing is ideal for building enterprise warehouses.
  • Easy to develop: Spark SQL supports both SQL interfaces and flexible iterative programming interfaces, facilitating data development in different scenarios.
  • Security: Spark SQL can be integrated with different security services to implement fine-grained authentication.

Therefore, it is entirely feasible to support enterprise-level data warehouse based on Spark SQL, and has been verified by many enterprises in practice.

As shown in the figure above, the logical architecture design of an enterprise database architecture based on Spark SQL contains the preceding parts. Each Spark SQL engine is a server, and The Spark SQL engine registers its information with Zookeeper. The SQL server uses the Spark SQL engine in Zookeeper to execute client requests. The SQL server is a server compatible with Hive JDBC interface. When using Spark SQL to support database building, consider the following points:

  • How to provide an interactive service to support different clients to connect, including interactive Beeline, as well as programmatic JDBC and tool interfaces.
  • The Spark SQL Ranger Plugin is required if the Ranger is used.
  • How to support task submission across multiple queues.

The core of using Spark SQL to support enterprise repositories is to provide a good task server to support task management. The task management Server is logically similar to HiveServer2, but lighter. It does not have the complex and onerous SQL parsing of HiveServe2, and does not have the Spark Thrift Server constraint that is itself a YARN job. An enterprise can develop a lightweight server based on its own business processes. Byte has deep practical experience in this area, and has its own Spark SQL engine server, which can pay attention to the subsequent dynamic. At the same time, other enterprises in the industry have done similar work, such as NetEase open source Kyuubi.

Kyuubi is based on Spark SQL. It makes up for the shortcomings of Spark Thrift Server in multi-tenant, resource isolation, and high availability. It is an open source project that can meet most production environment scenarios. However, Kyuubi was designed to complement the Spark Thrift Server and to enhance the Spark SQL capabilities, rather than to design a peer-to-peer service that can replace Hive components. Therefore, the migration cost is high for legacy projects. Spark SQL and Hive have two sets of incompatible SQL, and how to reduce the migration cost of legacy systems when using Kyuubi will be a very large workload.

The industry also has the open source Spark Thrift Server, which is an excellent idea. However, there are still many problems because the development process is a little too limited, mainly reflected in the following aspects:

  • Driver Single point: The Spark Thrift Server runs on YARN in the form of a Spark task. All the requests run in a Driver. Once the Driver fails, all the tasks fail simultaneously.
  • Resource isolation: Spark Thrift Server runs on YARN in the form of Spark tasks. Therefore, Spark Thrift Server cannot support cross-queue task submission. In addition, multiple tasks run in the same Driver. Resource usage affects each other, making it difficult to manage resources in a more refined manner.
  • Multi-tenancy: Spark Thrift Server supports multiple users from the request level. However, from the architecture level, Spark Thrift Server is a task running on Yarn. It also has its own Application Id and task submitter. Therefore, it is actually running as a super administrator, and the secondary tenant isolation must have some resource security problems.
  • High availability: Spark Thrift Server is not involved in high availability. Therefore, its high availability needs to be designed independently and client compatibility needs to be considered. For example, Hive JDBC stores HA information in ZK, but Spark Thrift Server does not have such a mechanism. Therefore, high availability has high implementation costs.

Therefore, Spark provides the Spark Thrift Server service to provide interface interaction modes such as JDBC. However, it lacks many generation functions and is rarely used in the production environment. Spark Thrift Server is more of a niche half-finished product, tinkering with parts of the problem, but not giving a complete solution, resulting in a lack of actual production applications right now.

Optimization practice of Bytedance EMR product in Spark SQL

Data lake engine integration

Data lake engines such as Hudi and Iceberg are more and more widely used. Many B-end clients also need to use the data lake engine when using Spark SQL. Therefore, byte EMR products need to integrate the data lake engine into Spark SQL. There are a lot of problems in this process.

First of all, when integrating with Iceberg, the problems of experience and ease of use are optimized. In the process of using Spark SQL, users need to manually input a lot of instructions and find the corresponding Spark-Iceberg dependent package, which is also the most commonly used scheme for integrating Iceberg at present. Our solution is to pre-install iceberg jars in the Spark Jars directory so that users can specify the Catalog without having to manually output a lot of instructions.

Secondly, Iceberg is used in Spark and Hive cross-engine analysis scenarios. Spark creates tables normally, Presto/Trono can read and write normally, but Hive cannot. Official documents do not have a clear description of this problem. The solution is to modify the Spark configuration file or Hive – site-Spark Override configuration. Ensure the initialization of the Spark Session configuration items in the iceberg. The engine. Hive. Enable value is true, hive to properly read the Spark to create table.

The essence of the problem is that Iceberg makes some compromises in the overall design in order to support Hive engine, and uses Storage Handler to achieve Hive reading and writing of Iceberg tables. Presto/Trono can be identified based on Hive format_type.

In terms of compatibility, Iceberg version 0.12 does not support Spark 3.2, and upgrade Spark has a very large impact range, so it updates Iceberg and uses a master snapshot version of the community to compile. Integrates with Spark 3.2.

Spark SQL Server

Tools such as Spark Thrift Server or Kyuubi are available to provide an SQL Server for Spark SQL. However, these tools do not meet the requirements for some B-end clients. Therefore, The EMR team of Bytedance designed and implemented Spark SQL Server itself, focusing on the following scenarios:

  • Compatible with Hive semantics: Since most b-end clients built data warehouses based on Hive in the early stage and replaced them with Spark SQL gradually, they must face a lot of system migration during the process. Since Hive and Spark SQL have different semantics, rewriting SQL implementation is a lot of work. Therefore, Spark SQL Server in byte EMR product is compatible with Hive semantics and Spark SQL semantics. Hive SQL parsing is injected into Spark engine when implemented. An SQL Parser Chain is formed, which matches a Parser to parse SQL and is compatible with the whole SQL semantics.
  • Initialize Spark SQL engine in advance: Submit Spark tasks on YARN before service requests arrive, initialize resource information, and make the engine wait. This reduces the task submission time and prompts the overall task execution time when there are many users.
  • Submit tasks across Yarn queues: You can specify Yarn queues to execute tasks.

As shown in the preceding figure, the SQL server implements the Thrift interface and provides the standard JDBC access interface. The Spark SQL engine also implements the Thrift interface. The Spark SQL engine has been submitted to Yarn and is in wait state when the service is started. When the business task arrives, the SQL server performs the filtering of the engine, matching an existing engine, or resubmitting a brand new engine to perform the task.

The SQL server supports common permission authentication, such as OpenLDAP and Kerberos, and supports different isolation levels. For example, Session level: Each service SQL server initializes a Spark SQL engine to receive tasks. After the task execution is complete, The engine is destroyed from Yarn. At the User level, 0 to N engines exist initially for users, which reside in Yarn and perform tasks alternately.

This Server design breaks through the limitations of Spark Thrift Server’s single Driver and decouples SQL services from task execution. This enables more fine-grained resource management and cross-queue task submission.

It is also compatible with Hive interfaces. Users can access the server in the following ways:

  • HA access link:
./bin/beeline -u "jdbc:hive2://emr-5fqkwudj144d2gc1k8hi-master-1/; serviceDiscoveryMode=zooKeeper; zooKeeperNamespace=midas/ha; auth=LDAP" -n emr_dev -pEMR123456emrCopy the code
  • Non-ha access links:
./bin/beeline -u "jdbc:hive2://emr-master-2:10005/default; Auth = LDAP "- n test_sub - pEMR123456emrCopy the code

Information in HA mode is recorded in Zookeeper in the same format as that in HiveServer2, ensuring that Hive clients can directly access servers in HA mode.

Spark SQL multi-tenant

During Hive job execution, the HiveServer2 service provides an SQL server for user identity authentication, permission judgment, SQL parsing, and final execution plan generation. The MR engine then performs specific distributed tasks.

In this process, HiveServer2 takes on a lot of responsibility and therefore consumes a lot of resources, which greatly affects the user’s concurrency. For distributed tasks, resources are constrained by the resources allocated by Yarn as the resource manager. However, in Hive, the resources are affected by HiveServer2. As a result, the number of concurrent users cannot increase with the increase of Yarn resources.

In the Spark SQL engine, SQL parsing is pushed down into the engine and integrated with the execution of specific distributed tasks. No independent server is required to perform SQL parsing. Because Spark SQL and Hive parse module architectures are different, the Hive On Spark mode is very difficult.

In the preceding scenario, the SQL server redesigned by bytedance EMR team only receives tasks, determines user resources, permissions, and identities, and then sends the tasks to the Spark SQL server running Yarn. The concurrency of Hive is not restricted by HiveServer2 and Yarn. Yarn resources determine the concurrency of users, which greatly reduces the resource requirements of Spark SQL server, enhances the stability of Spark SQL server, and greatly improves the concurrency of users.

Followed by the function of the engine preheating reduce task execution time, improve the overall speed, engine preheating refers to the service start time and submit a Spark to Yarn SQL engine, is in a state of waiting, when the business request arrived, based on the business type selecting from engine is already ready an engine to perform the task.

Not every engine that is preheated to commit is selected for execution, and there are three engine isolation levels in the SQL server:

  • Session: Each connection is submitted to the Spark SQL engine. After the connection is disconnected, the engine is destroyed on Yarn.
  • User: A User can share multiple Spark SQL engines. The number of Spark SQL engines is determined by the task resource requirements submitted by the User. The engine will not be destroyed after the connection is disconnected until the idle time of the engine reaches the upper limit.
  • Open: The Spark SQL engine can be shared by all users. It is usually used to deal with large accounts or scenarios where the cluster does not need permission management.

Thus, it can be seen that only at the User and Open levels, engine warm-up will generate value, and warm-up saves Spark Submit’s time. When the number of users is large enough, the group saves more time for statistical unit.

Spark SQL transaction support

Hive transaction intensity is implemented based on HiveServer2 using the following syntax:

CREATE TABLE tm (a int, b int) stored as orc TBLPROPERTIES
('transactional'='true', 'transactional_properties'='insert_only')
Copy the code

Transactions can be started, but since transactions are managed on the server, ACID affects all requests across HiveServer2. Spark SQL is well integrated and supports Hudi, Iceberg and other data lake formats. Therefore, the Spark SQL server does not need to implement a transaction mechanism similar to HiveServer2. Instead, it only needs to use features such as Hudi and Iceberg to support transactions when data is read and processed.

For example, update and delete operations are supported for Icdberg tables:

MERGE INTO prod.nyc.taxis ptUSING (SELECT * FROM staging.nyc.taxis) stON pt.id = st.idWHEN NOT MATCHED THEN INSERT *;
Copy the code

Therefore, When Spark SQL integrates Iceberg, it has the transaction capability. When combined with SQL server, Spark SQL can have the same benefits as Hive transaction capability at a low cost and without some constraints of Hive. From the perspective of such architecture design, It can completely replace Hive. On the other hand, when the number of users increases, data will be modified, updated and other operations, which is easy to create a large number of small broadcast transmission, resulting in the Driver OOM. Although large broadcasts also have OOM problems, large broadcasts can pass the threshold control, while small broadcasts do not take effect. Once the number of large broadcasts increases, it is easy to cause OOM of the Driver. The Byte EMR team solved the problem of Driver being hit by a large number of small broadcasts by merging small broadcasts.

The end of the

As enterprises’ services become more complex, they need more flexible and efficient data warehouse architectures. In such a business-driven environment, the limitations of Hive are becoming more and more obvious, and Spark SQL based solutions to flexibly build data warehouses will become more and more mainstream. Therefore, enterprises should consider how to build their own data systems based on Spark SQL when they consider building data warehouse systems. Spark’s complete and open ecosystem will surely have more excellent services around Spark in the future.