From Database Systems to Spark SQL (2) is too long, so the part and Spark SQL are discussed in this chapter. There is a sentence mentioned in the second chapter: For mongodb, the design goal is a document-type database. All the data is directly indexed to value through key, and the lowest height is adopted to reduce the disk seek during query. B-tree is adopted

Why can disk seek be reduced here? This is actually a bit like Spark’s Tungsten optimization mechanism. Here’s a look at this feature introduced by Databricks

The Tungsten project will be the biggest change to the Spark execution engine. It focuses on significantly improved efficiency of memory and CPU for spark applications, pushing performance closer to the limits of modern hardware. This effort consists of three initiatives:

  • Memory management and binary processing: Use application semantics to explicitly manage memory and eliminate the overhead of THE JVM object model and garbage collection
  • Cache-aware computing: Algorithms and data structures that leverage memory hierarchies
  • Code generation: Use code generation to take advantage of modern compilers and cpus

Pre-spark1.4 workloads were increasingly constrained by CPU and memory usage rather than IO and network communication, so there was a focus on OPTIMIZING CPU efficiency

Why is CPU the new bottleneck? There are several reasons for this. One is that hardware configurations provide an increasing amount of total IO bandwidth, such as 10Gbps links in networks and high-bandwidth SSDS or striped HDD arrays for storage. From a software perspective, Spark’s optimizer now allows many workloads to avoid a lot of disk IO by pruning unwanted input data for a given job. In Spark’s Shuffle subsystem, serialization and hashing (cpu-constrained) have proved to be key bottlenecks, not raw network throughput of the underlying hardware. All of these trends mean that Spark today is often limited by CPU efficiency and memory stress rather than IO.

Memory management and binary processing

Applications on the JVM typically rely on the JVM’s garbage collector to manage memory. Intended as a universal runtime for many workloads. However, as Spark applications push performance limits, the overhead of JVM objects and GC becomes significant.

Java objects have a large inherent memory overhead. Consider a simple string “abcd” that would take 4 bytes to store using UTF-8 encoding. However, the JVM’s native String implementation stores this differently to favor more common workloads. It uses UTF-16 encoding to encode each character in two bytes, and each String Object also contains a 12-byte header and an 8-byte hash code, as shown in the following output from the Java Object Layout tool.

java.lang.String object internals:
OFFSET  SIZE   TYPE DESCRIPTION                    VALUE
     0     4        (object header)                ...
     4     4        (object header)                ...
     8     4        (object header)                ...
    12     4 char[] String.value                   []
    16     4    int String.hash                    0
    20     4    int String.hash32                  0
Instance size: 24 bytes (reported by Instrumentation API)
Copy the code

A simple 4-byte string totals more than 48 bytes in the JVM object model

Another problem with the JVM object model is the overhead of garbage collection. At a high level, generational garbage collection divides objects into two categories: objects with high allocation/unallocation rates (young generation) and objects that are retained (old generation). The garbage collector takes advantage of the transient nature of the younger generation of objects to manage them effectively. This works well when GC can reliably estimate the life cycle of an object, but poorly if it doesn’t (i.e., some transient objects overflow into older generation objects). Because this approach is ultimately based on heuristics and estimation, performance may require the “dark arts” of GC tuning, which has dozens of parameters that give the JVM more information about the object’s life cycle.

But Spark isn’t just a general-purpose application. Spark understands how data flows through the stages of computation and across the scope of work and tasks. As a result, Spark knows more about the life cycle of a chunk of memory than the JVM garbage collector and should therefore manage memory more efficiently than the JVM.

To address object overhead and inefficient GC, an explicit memory manager was introduced that converts most Spark operations to operate directly on binary data rather than Java objects. It builds on the advanced functionality provided by the Sun.misc.unsafeJVM, which exposes C-style memory access (for example, explicit allocation, freeing, pointer arithmetic). In addition, the Unsafe method is inherent, meaning that each method call is JIT compiled into a machine instruction.

In some areas, Spark is already using explicitly managed memory. A new NetTy-based network transport is supported in Spark1.4 that explicitly manages all network buffers using Jemalloc such as a memory manager. This is critical to scaling Spark’s shuffling operations and winning the “sort benchmark.”

Memory management and binary processing will appear in Spark 1.4, including a hash table that operates directly on binary data, and memory is explicitly managed by Spark. This new implementation of HashMap has much less overhead than standard Java and is invisible to the garbage collector.

As shown above, the throughput of aggregation operations using different hash graphs is compared: one heap mode using the new hash graph, one using offheap, and one using java.util.hashMap. The new hash table supports over a million aggregation operations per second in a single thread, about twice the throughput of java.util.hashMap. What’s more, without adjusting any parameters, it hardly degrades performance as memory utilization increases, and JVM defaults eventually crash due to GC.

In Spark 1.4, this hash map will be used for DataFrame and SQL aggregation, while in 1.5, data structures will be prepared for most other operations, such as sorting and joining. In many cases, this will eliminate the need to tune the GC to achieve high performance.

Cache aware computing

Spark is a well-known memory computing engine. What this term really means is that Spark can efficiently leverage memory resources on a cluster to process data at a much higher rate than disk-based solutions. However, Spark can also handle data orders of magnitude larger than available memory, transparently spilling onto disk and performing external operations such as sorting and hashing.

Similarly, calculations of aware caches can increase data processing speed by making more efficient use of L1 / L2 / L3 CPU caches, as they are orders of magnitude faster than main memory. During the performance analysis of Spark user applications, a large amount of CPU time was spent waiting for data to be fetched from main memory. As part of the Tungsten project, cache-friendly algorithms and data structures are being designed, so Spark applications will spend less time waiting to fetch data from memory and more time doing useful work.

Consider the example of sorting records. The standard sort procedure stores an array of Pointers to records and uses Quicksort to swap Pointers until all records are sorted. Sorting usually has a good cache hit ratio due to the sequential scan access pattern. However, the cache hit ratio for sorting pointer lists is low, because each comparison requires the dereferencing of two Pointers to random records in memory.

So how do you improve the cache locality of sorts? A very simple approach is to store the sort key for each record side by side with the pointer. For example, if the sort key is a 64-bit integer, use 128 bits (64-bit pointer and 64-bit key) to store each record in a pointer array. Thus, each quicksort comparison operation looks up the pointer – key pair only in a linear fashion and does not require a ram lookup.

How does this apply to Spark? Most distributed data processing can be boiled down to a small number of operations, such as aggregation, sorting, and joining. By improving the efficiency of these operations, Spark application efficiency can be improved as a whole. A sort version with cache awareness has been built that is three times faster than the previous version. This new sort will be used for sort-based mashups, high-radix aggregations, and sort merge join operators.

The Spark Tusten workflow code is as follows:

We call the ExecutorMemoryManager to allocate memory, allocate a memory page, add it to the Page table, /** * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is * intended for allocating large blocks of memory that will be shared between operators. */ public MemoryBlock allocatePage(long size) { if (size > MAXIMUM_PAGE_SIZE_BYTES) { throw new IllegalArgumentException( "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes"); } final int pageNumber; synchronized (this) { pageNumber = allocatedPages.nextClearBit(0); if (pageNumber >= PAGE_TABLE_SIZE) { throw new IllegalStateException( "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages"); } allocatedPages.set(pageNumber); } final MemoryBlock page = executorMemoryManager.allocate(size); page.pageNumber = pageNumber; pageTable[pageNumber] = page; if (logger.isTraceEnabled()) { logger.trace("Allocate page number {} ({} bytes)", pageNumber, size); } return page; } Given the allocated memory page and the offset within the page, /** * Given a memory page and offset within that page, encode this address into a 64-bit long. * This address will remain valid as long as the corresponding page has not been freed. * * @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}. * @param offsetInPage an offset in this page which incorporates the base offset. In other words, * this should be the value that you would pass as the base offset into an * UNSAFE call (e.g. page.baseOffset() + something). * @return an encoded page address. */ public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { if (! inHeap) { // In off-heap mode, an offset is an absolute address that may require a full 64 bits to // encode. Due to our page size limitation, though, we can convert this into an offset that's // relative to the page's base offset; this relative offset will fit in 51 bits. offsetInPage -= page.getBaseOffset(); } return encodePageNumberAndOffset(page.pageNumber, offsetInPage); } high 13bits is the page number, Low for migration in page @ VisibleForTesting public static long encodePageNumberAndOffset (int pageNumber, long offsetInPage) { assert (pageNumber ! = -1) : "encodePageNumberAndOffset called with invalid page"; return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS); } Given a logical address, VisibleForTesting public static int decodePageNumber(long pagePlusOffsetAddress) {return (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> OFFSET_BITS); } Given a logical address, Private static Long decodeOffset(Long pagePlusOffsetAddress) {return (pagePlusOffsetAddress &) MASK_LONG_LOWER_51_BITS); } set /** * Get the page associated with an address encoded by * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} */ public Object getPage(long pagePlusOffsetAddress) { if (inHeap) { final int pageNumber = decodePageNumber(pagePlusOffsetAddress); assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); final MemoryBlock page = pageTable[pageNumber]; assert (page ! = null); assert (page.getBaseObject() ! = null); return page.getBaseObject(); } else { return null; } } /** * Get the offset associated with an address encoded by * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} */ public long getOffsetInPage(long pagePlusOffsetAddress) { final long offsetInPage = decodeOffset(pagePlusOffsetAddress); if (inHeap) { return offsetInPage; } else { // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we // converted the absolute address into a relative address. Here, we invert that operation: final int pageNumber = decodePageNumber(pagePlusOffsetAddress); assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); final MemoryBlock page = pageTable[pageNumber]; assert (page ! = null); return page.getBaseOffset() + offsetInPage; }}Copy the code

Code generation

Spark1.4 introduces code for generating expression evaluations in SQL and DataFrames, such as the process described from database systems to Spark SQL (3). Expression evaluation is the process of evaluating the value of an expression (such as “”) on a particular record of age > 35 && age < 40. At run time, Spark dynamically generates bytecode to evaluate these expressions, rather than a slower interpreter that executes step by step on each line. Compared to interpretation, code generation reduces the boxing of raw data types and, more importantly, avoids expensive polymorphic function scheduling.

In an earlier blog post, it was demonstrated that code generation can increase the speed of many TPC-DS queries by nearly an order of magnitude. Now extend the code generation scope to most of the built-in expressions. In addition, the ability to leverage the JIT to take advantage of better instruction pipelining in modern cpus increases the level of code generation from expression evaluation of a single record to vectorized expression evaluation so that multiple records can be processed at once.

Code generation is also applied beyond expression evaluation to optimize THE CPU efficiency of internal components. One area of great excitement about application code generation is speeding up the conversion of data from in-memory binary formats to wired protocols for randomization. As mentioned earlier, shuffling is usually the bottleneck of data serialization rather than the underlying network. Through code generation, serialization throughput can be improved, which in turn improves the throughput of the shuffling network.