Introduction: This paper focuses on the selection of shuffle, join mode, object reuse, UDF reuse and other aspects of JINGdong Flink SQL task optimization measures.

This article is written by Ying Zhang and Xuehao Duan of JD Algorithm Services. It was proofread by Rui Li, technology expert at Apache Hive PMC and Alibaba. The main contents are:

Background Flink SQL optimization summary

The background,

At present, the data processing process of JD search and recommendation is shown in the figure above. As you can see, the real-time and offline data are separated. The offline data processing is mostly Hive/Spark, while the real-time data processing is mostly Flink/Storm.

This leads to the following phenomenon: in a business engine, users need to maintain two sets of environment, two sets of code, many commonalities cannot be reused, data quality and consistency is difficult to be guaranteed. And because of the inconsistency of the data model, a lot of piecing logic is needed. Even in order to data consistency, need to do a lot of year-on-year, sequential, secondary processing and other data comparison, efficiency is very poor, and very easy to make mistakes.

Flink SQL, which supports batch streaming, can solve this pain point to a large extent, so we decided to introduce Flink to solve this problem.

In most operations, especially Flink operations, optimization of execution efficiency has always been the key to Flink task optimization, which is especially important in the case of JD’s daily data increment of PB.

Those of you who have written some SQL jobs know that for Flink SQL jobs, in some cases the same UDF will be called over and over again, which is very unfriendly for some resource-consuming tasks; In addition, shuffle, join, and failover policies affect the execution efficiency. In addition, the Flink task debugging process is very complicated, especially for companies with isolated machines online.

To this end, we implemented embedded Derby to act as Hive’s metadata storage database (allowEmbedded); In terms of task recovery, the checkpoint mechanism is not used to implement failover for batch jobs, but Flink’s unique region policy enables the rapid recovery of batch jobs. In addition, this article also introduces the object reuse and other related optimization measures.

Two, Flink SQL optimization

1. The UDF reuse

Under this kind of situation will appear in Flink SQL task: if the same UDF both appeared in the LogicalProject, appeared in the Where condition, then the UDF will be called multiple times (see issues.apache.org/jira/browse… UDF consumes a lot of CPU or memory, and this kind of redundant computation will greatly affect performance. Therefore, we hope to cache the results of UDF for direct use next time. Design considerations: (Very important: make sure LogicalProject and where subtask chain are connected)

  • A TaskManager may have multiple subtasks, so the cache must be thread (Thread LOCAL) or TM;
  • In order to prevent some cases where the logic to clear the cache fails, it is necessary to clear the cache in the close method.
  • In order to prevent the infinite increase of memory, select the best cache can actively control the size; As for the “timeout period”, it is recommended to configure it, but preferably not less than the time of UDF calls.
  • As mentioned above, there may be multiple subtasks in a TM, which is equivalent to a multi-threaded environment in TM. First our cache needs to be thread-safe, and then we can determine whether locks are needed or not based on business.

According to the above considerations, we use guava cache to cache the results of UDF, and then directly fetch data from cache when calling, which may reduce the consumption of tasks to the greatest extent. Here’s a simple use (with maximum size, timeout, but no write lock) :

public class RandomFunction extends ScalarFunction { private static Cache<String, Integer> cache = CacheBuilder.newBuilder() .maximumSize(2) .expireAfterWrite(3, TimeUnit.SECONDS) .build(); public int eval(String pvid) { profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet()); Integer result = cache.getIfPresent(pvid); if (null == result) { int tmp = (int)(Math.random() * 1000); cache.put("pvid", tmp); return tmp; } return result; } @Override public void close() throws Exception { super.close(); cache.cleanUp(); }}Copy the code

2. Unit testing

You may wonder why unit tests are included in optimizations. You know that Flink task debugging is very complex, especially for companies with isolated machines online. There is no way to access the task server in the local environment of JD, so we spent a lot of time on uploading JAR packages and viewing logs during the initial debugging of the task.

In order to reduce debugging time for tasks and increase development efficiency for code developers, we implemented embedded Derby as Hive’s metadata storage database (allowEmbedded) as a way to optimize development time. The specific ideas are as follows:

First create Hive Conf:

public static HiveConf createHiveConf() { ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader(); HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML)); try { TEMPORARY_FOLDER.create(); String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); HiveConf hiveConf = new HiveConf(); hiveConf.setVar( HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); hiveConf.set("datanucleus.connectionPoolingType", "None"); hiveConf.set("hive.metastore.schema.verification", "false"); hiveConf.set("datanucleus.schema.autoCreateTables", "true"); return hiveConf; } catch (IOException e) { throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e); }}Copy the code

Next create Hive Catalog :(using reflection to invoke embedded’s interface)

public static void createCatalog() throws Exception{
    Class clazz = HiveCatalog.class;
    Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
    c1.setAccessible(true);
    hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
    hiveCatalog.open();
}
Copy the code

Create tableEnvironment:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
TableConfig tableConfig = tableEnv.getConfig();
Configuration configuration = new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
Copy the code

Close Hive Catalog:

public static void closeCatalog() {
    if (hiveCatalog != null) {
        hiveCatalog.close();
    }
}
Copy the code

In addition, for unit testing, building appropriate data sets is also a very large function. We implemented CollectionTableFactory, which allows us to build appropriate data sets by ourselves, using the following method:

CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource = new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + "  `pvid` string) with ('connector.type'='COLLECTION','is-bounded' = 'true')");
tableEnv.executeSql(sbFilesSource.toString());
Copy the code

3. Select the join mode

There are three basic implementations of traditional offline Batch SQL (SQL for bounded data sets) : nested-loop Join, sort-merge Join, and Hash Join.

  • Nested-loop Join is the simplest and most direct, loading two datasets into memory and comparing the elements in the two datasets one by one with Nested traversal to see if they meet Join conditions. NestedLoopJoin is the least efficient in both time and space, and can be disabled by using: table.exec. Disabled -operators:NestedLoopJoin. The following two images are before and after disabling (if your disabling doesn’t work, check to see if it’s equi-join) :
  • Sort-merge Join is divided into Sort and Merge phases: first, Sort the two data sets respectively, and then iterate and match the two ordered data sets respectively, which is similar to Merge Sort. Sort-merge Join requires two data sets to be sorted, but can be used as an optimization if the two inputs are ordered data sets.
  • Hash Join is also divided into two phases: first, convert one dataset to a Hash Table, and then iterate over another dataset element and match the element in the Hash Table. The first stage and first data set are called build stage and Build Table respectively; The second stage and the second data set are called probe stage and Probe Table respectively. Hash Join is efficient but requires large space. It is usually used as an optimization scheme when one of the Join tables is a small table suitable to be placed in memory (it does not allow disk overwriting).

Note: Sort-merge Join and Hash Join only work with equi-join (both Join conditions use equal as the comparison operator).

Flink has made some further subdivisions on join, including:

  • Repartition-repartition strategy: The two data sets of Join partition their key with the same partition function respectively, and send data over the network;
  • Broadcast-forward strategy: the large data set is not processed, and the other small data set is copied to the machines with some data in the cluster.

Batch Shuffle is notoriously time-consuming.

  • If there is a big gap between the two data sets, the broadcast-forward strategy is recommended.
  • If the two data sets are similar, the repartition-repartition strategy is recommended.

By: table. Optimizer. Join. Broadcast – threshold to set the adoption of broadcast table size, if set to “1”, to disable the broadcast said.

Below is the effect before and after disabling:

4. multiple input

In Flink SQL tasks, reducing shuffle can effectively improve the throughput of SQL tasks. In actual business scenarios, the following situations are often encountered: The upstream generated data has met the data distribution requirements (for example, multiple consecutive join operators with the same key). In this case, the forward shuffle of Flink is redundant shuffle. We want to chain these operators together. Flink 1.12 introduces the mutiple Input feature, which eliminates most of the unnecessary forward shuffle and chains the source operator together.

Table. The optimizer. Multiple input – enabled: true

The following figure shows the topology with multiple Input enabled and without operator chain enabled:

5. Object reuse

Data transfer between upstream and downstream operators goes through the serialization/deserialization/replication phase, which affects the performance of Flink SQL programs, and can be improved by enabling object reuse. However, this is very dangerous in DataStream, because it can happen that modifying an object in the next operator accidentally affects the object of the above operator.

However, Flink’s Table/SQL API is very secure and can be enabled as follows:

StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); env.getConfig().enableObjectReuse();Copy the code

Or by setting pipeline-object-reuse:true

Why is there such a performance boost when object reuse is enabled? In Blink planner, data exchange between two operators for the same task will eventually call BinaryString#copy. Looking at the implementation code, you can see that BinaryString#copy needs to copy the bytes of the underlying MemorySegment. You can improve efficiency by enabling object reuse to avoid duplication.

Below is the corresponding flame diagram when object reuse is not enabled:

6. Failover policy of the SQL task

In batch task mode, checkpoint and its related features are unavailable. Therefore, the checkpoint based failover policy for real-time tasks cannot be applied to batch tasks. However, batch tasks allow tasks to communicate with each other by Blocking Shuffle. When a Task fails due to unknown reasons, all data required by the Task is stored in the Blocking Shuffle. Restart this Task and all downstream tasks connected to it via Pipeline Shuffle:

Jobmanager. Execution. Failover – strategy: region (already finish the operator can be directly restore)

Table.exec. Shuffle-mode :ALL_EDGES_BLOCKING (shuffle policy).

7. shuffle

The shuffle in Flink is divided into Pipeline shuffle and blocking shuffle.

  • Pipeline Shuffle performs well, but has high requirements on resources and poor fault tolerance (the operator is assigned to the first region. For batch tasks, if the operator is faulty, the operator is recovered from the last region).
  • A blocking shuffle is a traditional batch shuffle that will drop data onto the disk. This type of shuffle is fault-tolerant, but will generate a large number of disk and network IO (if you want to save trouble, you are advised to use blocking suffle). Blocking shuffle is also divided into hash shuffle and sort shuffle. If your disk is an SSD and the concurrency is not too large, you can use the hash shuffle option. The DISK I/O is greatly affected. If you are SATA and the concurrency is high, you can use sort-merge shuffle. This shuffle produces less data, sequential reads, and does not generate a lot of disk I/O, but it is more expensive (sort merge).

Corresponding control parameters:

Table.exec. Shuffle-mode: this argument takes multiple arguments. The default is ALL_EDGES_BLOCKING, meaning all edges are blocking shuffle, but try POINTWISE_EDGES_PIPELINED. Indicates that forward and rescale edges will automatically start pipeline mode.

Taskmanager.net work.sort-shuffle.min-parallelism enable sort-merge shuffle by setting this parameter to less than your parallelism. The setting of this parameter needs to consider some other circumstances, the specific can be set according to the official website.

Third, summary

This paper focuses on the selection of shuffle, join mode, object reuse, UDF reuse and other aspects of JINGdong Flink SQL task optimization measures. In addition, I would like to thank Fu Haitao and all colleagues of JD Real-time computing RESEARCH and development department for their support and help.

The original link

This article is ali Cloud original content, shall not be reproduced without permission.