Introduction: Spark SQL is a basic module used by Spark to process structured data. It has become an important choice for most enterprises to build big data applications. However, Spark’s performance faces stability and performance challenges in large-scale Join and Aggregate workloads.
Spark SQL is a basic module used by Spark to process structured data. It has become an important choice for most enterprises to build big data applications. However, Spark’s performance faces stability and performance challenges in large-scale Join and Aggregate workloads.
To improve Spark SQL performance, you can choose to use Intel ® Optimized Analytics Package. OAP) and Intel ® Upton ™ Persistent memory and next-generation Intel ® Xeon ® processors to improve the performance of typical Spark SQL workloads.
Spark SQL faces performance bottlenecks in multiple scenarios
IDC reports that global data volumes will grow from 45 ZB in 2019 to 175 ZB in 2025, with an estimated 59 ZB of data created, captured and consumed in 2020. With data moving fast and growing rapidly, organizations need to use advanced analytics to process data in real time to gain real-time business insights. New developments in big data analytics technology and the advent of revolutionary new hardware have significantly improved the performance of big data analytics, enabling data scientists, analysts and business users to gain deeper business insights.
As a fast and versatile computing engine designed for large-scale data processing, Spark is open source, economical, and flexible. It is commonly used to build large-scale, low-latency data analysis applications. However, Spark still faces performance challenges in certain scenarios, especially when dealing with very large data and interactive queries. For example, data I/O can easily become a bottleneck due to the lack of a high-performance caching solution. In addition, Spark Shuffle is often a performance bottleneck due to a large number of small random disk I/OS, serialization, and network data transmission, which greatly increases job latency and affects workload performance.
Emerging hardware technologies can help address these challenges. For example, Advanced Vector Extension (AVX) features enable Spark to process more data simultaneously with SIMD to speed up execution, while Intel ® Upton ™ Persistent Memory can improve Spark SQL performance with its breakthrough combination of high performance, large capacity, and low latency innovations. OAP (Optimization Analysis Package) is an open source project developed by Intel and the community. It aims to improve Spark performance through innovative software features such as data source caching, SQL indexing, Native SQL engine, and MLlib optimization, using advanced Intel processor, memory and storage, and networking technologies. To solve the computing and I/O challenges faced by Spark’s core and related components.
Intel Spark Optimization Analysis Package (OAP)
Intel ® Optimization Analysis Pack (OAP) is an open source project developed by Intel and the community to improve Spark performance. It is based on advanced Intel hardware technology and provides multiple features to improve Spark caching, Shuffle, execution, and machine learning performance. Figure 1 below shows the OAP architecture, which includes the following components: OAP data source cache, Native SQL engine, Arrow data source, OAP MLlib, RDD cache, RPMem Shuffle, and remote Shuffle.
- SQL data source cache: An optimized extension package that uses caching technology at the Spark SQL data source layer to improve Spark SQL performance.
- Native execution engine: Spark SQL’s Native engine converts Spark row-number processing into column processing and accelerates it with the help of vectorized SIMD and Arrow data format.
- MLlib: An alternative version of Vanilla Spark MLlib, optimized with oneDAL, oneMKL, and oneCCL.
- ** Features such as RDD cache and RPMem Shuffle: ** Avoids storage overflow (including RDD cache, overflow, and intermediate data) by using the large capacity and high performance of persistent memory to improve Spark performance.
- Remote Shuffle: Supports remote Shuffle and persistent memory-based Shuffle.
(figure 1)
OAP data source cache
The SQL DataSource Cache (SQL DataSource Cache) is designed to improve Spark SQL performance by leveraging user-defined indexes and intelligent fine-grained in-memory data caching (as shown in Figure 2). The main purpose is to solve the performance problems of interactive queries and batch jobs.
(figure 2)
- Interactive query
Most users use Spark SQL as the batch processing engine. But as a unified processing engine, it is difficult to distinguish it from non-batch processing. Interactive queries need to return data in seconds, or even subseconds, rather than the minutes, or even hours, required for batch processing. This is a big challenge for the current Spark SQL data processing. Interactive queries typically process large data sets, but return only a fraction of the data filtered through specific criteria. Spark SQL interactive query processing times can be significantly reduced by creating and storing full B+ tree indexes for key columns and using an intelligent fine-grained in-memory data caching strategy.
- Batch job
For users using Spark SQL for business analysis in a data warehouse, THE OAP SQL data source cache can speed up batch jobs with two configurable caching strategies:
• Automatic caching of hot data.
• Dedicated cache heat meters.
SQL index and data source caches provide a uniform cache representation for different column storage formats, and a fine-grained cache unit is designed for a single column in a RowGroup. At the same time, it designs a compatible adapter layer for Parquet and ORC, the two column storage file formats, and both indexes and caches are built on top of the unified representation and adapter.
OAP data source cache architecture design
The data source cache caches vectorized data that has been decompressed and decoded as well as binary raw data. DRAM is typically used as a cache medium in Spark clusters, but Intel ® Outton ™ persistent memory can also be used as a cache medium in OAP data source caches to provide high-performance, cost-effective caching solutions. Figure 3 below shows the architectural design of the OAP data source cache when Intel ® Outton ™ Persistent memory is used as the cache medium.
(figure 3)
The OAP data source cache provides the following main functions:
- Overrides the built-in Parquet/ORC file format.
- Provide local cache in distributed clusters (external KV storage is required to support metadata persistence).
- NUMA binding for higher performance (no NUMA binding is required if SnoOPY mode for Intel ® Outton ™ persistent memory is enabled).
- Plasma – based implementation to allow multiple Spark execution units to access the cache simultaneously.
OAP RPMem Shuffle
Spark aims to provide high-throughput and low-latency data processing for different workloads such as AD hoc queries, real-time streaming, and machine learning. However, Under some workloads (large-scale data connections/aggregation), Spark may experience performance bottlenecks due to Shuffle’s need to read/write intermediate data from the local Shuffle disk and transfer it over the network. Intel ® Upton ™ Persistent memory is an innovative memory technology that typically provides more capacity and data persistence at the same price as DRAM. At the same time, remote direct memory access (RDMA) technology supports operating system-independent direct memory access between different computers, providing high throughput, low latency network performance. The use of high-performance Intel ® Outton ™ persistent memory and RDMA networks can help overcome the Shuffle challenge to some extent.
OAP RPMem Shuffle Provides a pluggable module named RPMem Shuffle Extension. This module can override the default Spark Shuffle manager by modifying the configuration file without changing the Spark code. With this extension, Spark Shuffle can take full advantage of Intel ® Upton ™ Persistent memory and RDMA Shuffle solutions, significantly improving shuffle performance compared to traditional disk-based shuffle.
OAP RPMem Shuffle architecture design
As mentioned above, Spark Shuffle is an expensive operation that requires a large number of small random disk I/O, serialization, and network data transfer. Therefore, it greatly increases job latency and easily becomes a bottleneck of workload performance. Typically, Spark Shuffle loads data from the underlying store as input to a Mapper, which then processes the data according to certain rules, such as grouping the data into different partitions based on specific keys. The output of each Mapper is persisted to local storage, a Shuffle write operation. The Reducer then attempts to read output data of different Mapper, namely Shuffle read, and then performs aggregation operations such as sorting the read data, and finally outputs the result. It can be seen that a classic Shuffle operation involves reading and writing data on disk and transferring data over the network, both of which can become performance bottlenecks for workloads under large data sets.
OAP RPMem Shuffle is designed to solve the Shuffle bottleneck. As shown in Figure 4, OAP RPMem Shuffle can override the existing Spark Shuffle by using additional libraries. At the bottom, it uses Intel ® Upton ™ Persistent memory as Shuffle medium and accesses Intel ® Upton ™ persistent memory in user space via libPMemobj as an important component of PMDK. Libpmemobj provides transaction object storage on Intel ® Outton ™ Persistent memory. OAP RPMemShuffle extension Encapsulates libpmemobj using the Java Native Interface and connects to Spark as a plug-in through Spark Shuffle Manager.
(figure 4)
The RDMA card is optional for the RPMem Shuffle extension, which increases network bandwidth, reduces network latency, and reduces CPU utilization of communication nodes. HPNL4 is a high-performance network library that supports various network protocols, such as TCP/IP, RoCE, iWRAP, and OPA. It provides network communication support for RPMem Shuffle. Figure 5 below shows the design of Vanilla Spark Shuffle and OAP RPMem Shuffle.
(figure 5)
In the Vanilla Spark Shuffle design, data was first serialized to off-heap memory, then written to a local file system on a mechanical hard disk or solid state disk, and finally transmitted over a TCP-IP network. This process involves a lot of context switching and file system overhead, so without changes to the current Spark Shuffle implementation, you cannot take full advantage of Intel ® Outton ™ persistent memory capabilities.
The OAP RPMem Shuffle uses the LibPMemobj library to write data directly to Intel ® Upton ™ persistent memory and then transfers data by registering the RDMA memory region with Intel ® Upton ™. This implementation reduces context switching overhead, eliminates file system overhead, and makes full use of RDMA to achieve zero copy to further reduce latency and CPU utilization.
The original link
This article is the original content of Aliyun and shall not be reproduced without permission.