Zhengqiang, SphereEx Senior Middleware Development Engineer, Apache ShardingSphere Committer.

In 2018, I started to contact Apache ShardingSphere middleware, and once led the company’s internal massive data database and table, with rich practical experience; Love open source, willing to share, currently focus on Apache ShardingSphere kernel module development.

preface

In the previous technical readings of Apache ShardingSphere 5.1.0, we learned about the SQL formatting capabilities provided by the parsing engine, as well as the newly optimized high availability capabilities. In addition to providing users with new and useful features, the Apache ShardingSphere community is constantly working to improve performance.

In the case of a single-library 10 sharded T_ORDER table, max-connections-size-per-query uses the default configuration 1, which results in full routing if the user executes the SELECT * FROM T_order statement. Because each query allows only one database connection to be created on the same database, the underlying SQL results are pre-loaded into memory for processing. This scenario limits the consumption of database connection resources but consumes more memory resources. If the user sets max-connections-size-per-query to 10, then 10 database connections can be created simultaneously while executing the real SQL. Since database connections can hold result sets, this scenario does not consume additional memory resources. But it obviously consumes more database connection resources.

In order to better solve the problem of database connection resources and memory resources, in the just released version 5.1.0, we optimized the performance of THE SQL execution engine by using the SQL rewriting engine to optimize the rewriting, and combined multiple real SQL on the same data source with UNION ALL statements. In this way, the execution engine effectively reduces the consumption of database connection resources, reduces the occurrence of memory merging, and greatly improves SQL query performance in OLTP scenarios. Let’s take a look at the details of execution engine performance optimization with a specific SQL example.

Execution Engine Principles

Before we dive into execution engine performance tuning, let’s review the Apache ShardingSphere microkernel and the principles of the execution engine in the kernel process. As shown in the figure below, Apache ShardingSphere microkernel contains core processes such as SQL parsing, SQL routing, SQL rewriting, SQL execution and result merging.

The SQL parsing engine parses SQL statements entered by the user and generates SQLStatements containing context information. The SQL routing engine extracts sharding conditions according to the parsing context, and then calculates the data source that the real SQL needs to execute and generates routing results combined with the sharding rules configured by the user. The SQL rewriting engine rewrites the original SQL according to the results returned by the SQL routing engine, including correct rewriting and optimized rewriting. The SQL execution engine is responsible for sending the real SQL returned by the SQL routing and rewriting engine to the underlying data source for execution safely and efficiently, and the result set of execution will be processed by the merging engine to generate a unified result set and returned to the user.

It can be seen from the execution process of the whole microkernel that the SQL execution engine interacts directly with the underlying database and is responsible for holding the result set of execution. It can be said that the performance and resource consumption of the execution engine are directly related to the performance and resource consumption of the whole Apache ShardingSphere. Therefore, Apache ShardingSphere employs an automated SQL execution engine that is responsible for tradeoffs between execution performance and resource consumption.

From the point of view of execution performance, allocating an independent database connection for each shard execution statement can make full use of multi-threading to improve execution performance, and the consumption generated by I/O can be processed in parallel. In addition, assigning a separate database connection to each shard prevents premature loading of the query result set into memory. A separate database connection can hold a reference to the cursor location of the query result set and move the cursor when the corresponding data needs to be retrieved.

From the perspective of resource control, the number of connections that services can access the database should be limited to prevent one service from occupying too many database connection resources and affecting the normal access of other services. Especially in the case of many sub-tables in a database instance, a logical SQL without shard keys will produce a large number of real SQL that falls on different tables in the same library. If each real SQL occupies an independent join, then a query will undoubtedly occupy too many resources.

In order to solve the conflict between execution performance and resource control, Apache ShardingSphere proposed the concept of connection mode. The following is the definition of Apache ShardingSphere source code for connection mode.

/**
 * Connection Mode.
 */
public enum ConnectionMode {

    MEMORY_STRICTLY, CONNECTION_STRICTLY
}
Copy the code

As you can see from the naming of the members in the ConnectionMode enumeration class, the SQL execution engine divides database connections into MEMORY_STRICTLY and CONNECTION_STRICTLY.

  • MEMORY_STRICTLY stands for memory-limited mode. In memory-limited mode, if the logical table corresponds to 10 real tables for the same data source, the SQL execution engine will create 10 connections to execute them in parallel. As the result sets of each shard are held by corresponding connections, Therefore, there is no need to load the result set into memory in advance, which effectively reduces the memory footprint.

  • CONNECTION_STRICTLY indicates the connection restriction mode. In connection restriction mode, the SQL execution engine creates only one connection on the same data source and strictly controls the consumption of database connection resources. The result set is loaded into the memory immediately after the actual SQL execution, occupying some memory space.

So how does the Apache ShardingSphere SQL execution engine help users select connection modes? The logic for the SQL execution engine to select a connection mode can be seen in the following figure:

By the user configuration maxConnectionSizePerQuery parameters, can specify each statement maximum number of connections allowed on the same data source. According to the above calculation formula, when the number of SQL required to execute each database connection is less than or equal to 1, it indicates that each truly executed SQL can be allocated an independent database connection. In this case, the memory limitation mode is selected and multiple database connections can be created for parallel execution by the same data source. Otherwise, the connection restriction mode is selected, which allows only one database connection to be created for the same data source, and then the result set is loaded into the in-memory result set, which is then made available to the merge engine.

Execution engine optimization

After getting familiar with the internal principle of Apache ShardingSphere SQL execution engine, we found that memory limited mode can consume more database connections, but can achieve better performance through concurrent execution. Connection limited mode can effectively control the use of connection resources, but will occupy too much memory. Execution performance is also affected.

So is it possible to execute in a way that uses as few database connections as possible while consuming less memory? According to the above analysis of SQL execution engine, the selection of execution mode mainly depends on the number of routing results from the same data source. Therefore, the most direct optimization idea is to merge the routing results from the same data source. SQL statements naturally support the combination of multiple query statements through UNION ALL. Therefore, we adopt the UNION ALL scheme to optimize the rewriting of multiple real SQL in the same data source, so as to rewrite multiple real SQL into one SQL, which can greatly reduce the acquisition of database connections. At the same time, the in-memory result set can also be converted to streaming result set, reducing the footprint of memory.

Considering that different database dialects have limitations on the use of UNION ALL statements, we investigated the official documents of MySQL, PostgreSQL, Oracle and SQL Server and obtained the following information after sorting.

MySQL UNION ALL MySQL UNION ALL

  • The column name after UNION uses the column name in the first SELECT statement;

  • When the UNION contains ORDER BY and LIMIT, each query statement needs to be enclosed in parentheses. The UNION cannot guarantee the ORDER of the final result set. If you want to sort the UNION result set, you need to add the ORDER BY LIMIT clause at the end of the UNION statement.

    There is no guarantee that the UNION result set is ordered

    (SELECT a FROM t1 WHERE a=10 AND B=1 ORDER BY a LIMIT 10) UNION (SELECT a FROM t2 WHERE a=11 AND B=2 ORDER BY a LIMIT 10);

    Ensure that the UNION result set is orderly

    (SELECT a FROM t1 WHERE a=10 AND B=1) UNION (SELECT a FROM t2 WHERE a=11 AND B=2) ORDER BY a LIMIT 10;

  • The UNION does not support SELECT HIGH_PRIORITY statements and SELECT INTO file statements

PostgreSQL UNION ALL

  • The column name after UNION uses the column name in the first SELECT statement;

  • When the UNION contains ORDER BY and LIMIT, you need to use parentheses to enclose each query statement. The last UNION clause may not use parentheses. Without parentheses, the ORDER BY LIMIT clause applies to the entire UNION result.

  • The UNION statement does not support FOR NO KEY UPDATE, FOR UPDATE, FOR SHARE, or FOR KEY SHARE.

Oracle UNION ALL

  • The UNION statement does not support BLOB, CLOB, BFILE, VARRAY, LONG or nested table.

  • The UNION statement does not support for_update_clause.

  • The UNION statement does not support order_by_clause in the SELECT clause. Order_by_clause must be added at the end of the UNION statement.

    SELECT product_id FROM order_items UNION SELECT product_id FROM inventories ORDER BY product_id;

  • The UNION statement does not support SELECT statements containing TABLE Collection expressions.

SQL Server UNION ALL

  • When the ORDER BY clause is used in a UNION statement, it must be placed above the last select clause to sort the UNION results.

FROM the above information, different database dialects can support simple SELECT * FROM table WHERE statement, ORDER BY LIMIT can also be supported BY syntax adjustment, but there are some grammatical differences in use. However, the more complex group query, sub-query, and associated query are not described in detail in the official documents. Considering that SQL optimization rewriting needs to ensure SQL compatibility, Apache ShardingSphere 5.1.0 only selects simple SELECT * FROM TABLE WHERE statement for rewriting, aiming to quickly improve query performance in OLTP scenarios.

The RouteSQLRewriteEngine has been added to Apache ShardingSphere 5.1.0 with optimized rewriting logic for SELECT * FROM table WHERE statements. First, isNeedAggregateRewrite is used to judge. Only when the route result in the same data source is greater than 1 and the SQL actually executed meets the SELECT * FROM TABLE WHERE structure, the UNION ALL rewriting is performed.

/** * Rewrite SQL and parameters. * * @param sqlRewriteContext SQL rewrite context * @param routeContext route context *  @return SQL rewrite result */ public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) { Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1); for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) { Collection<RouteUnit> routeUnits = entry.getValue(); if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) { result.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits)); } else { result.putAll(createSQLRewriteUnits(sqlRewriteContext, routeContext, routeUnits)); } } return new RouteSQLRewriteResult(result); }Copy the code

Due to the use of UNION ALL rewrite, the judgment logic of queryResults in the merge engine also needs to be adjusted synchronously. The original multiple queryResults may be merged by UNION ALL into one queryResults. Merge logic still needs to be performed in this scenario.

@Override public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext<? > sqlStatementContext, final ShardingSphereSchema schema) throws SQLException { if (1 == queryResults.size() && ! isNeedAggregateRewrite(sqlStatementContext)) { return new IteratorStreamMergedResult(queryResults); } Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0)); SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext; selectStatementContext.setIndexes(columnLabelIndexMap); MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schema); return decorate(queryResults, selectStatementContext, mergedResult); }Copy the code

To help you understand the logic before and after optimization, we use the following sharding configuration. SELECT * FROM t_order to specify the optimization effect. In this example, max-connections-size-per-query uses the default value 1.

rules: - ! SHARDING tables: t_order: actualDataNodes: ds_${0.. 1}.t_order_${0.. 1} tableStrategy: standard: shardingColumn: order_id shardingAlgorithmName: t_order_inline databaseStrategy: standard: shardingColumn: user_id shardingAlgorithmName: database_inline shardingAlgorithms: database_inline: type: INLINE props: algorithm-expression: ds_${user_id % 2} t_order_inline: type: INLINE props: algorithm-expression: t_order_${order_id % 2}Copy the code

In version 5.0.0, after executing the SELECT * FROM t_order statement, we can get the following routing result, which contains ds_0 and DS_1 data sources, and two routing results each. Because max-connections-size-per-query is set to 1, there is not enough database connection for every SQL that is actually executed, so connection restriction mode is selected.

At the same time, because the connection restriction mode is used, the result set will be loaded into the memory after parallel execution, and the JDBCMemoryQueryResult will be used for storage. When the user result set is large, it will occupy more memory. The use of in-memory result sets also results in in-memory merging instead of streaming merging.

private QueryResult createQueryResult(final ResultSet resultSet, final ConnectionMode connectionMode) throws SQLException {
    return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet);
}
Copy the code

In version 5.1.0, we used UNION ALL to optimize the SQL execution, where multiple routing results from the same data source were merged into a single SQL execution. Since it is possible to satisfy a database connection to hold one result set, the memory limited mode is chosen. In memory-limited mode, a streaming result set is held by the JDBCStreamQueryResult object, which can be queried as a streaming query when data is needed.

Performance optimization test

From the examples in the previous section, we can see that using UNION ALL to optimize overwrite can effectively reduce the cost of database connections and convert the in-memory result set to a streaming result set, thus avoiding excessive memory usage. In order to explain the performance improvement of optimization in a more specific way, we conducted compression test for the logic before and after optimization. The software versions used for compression test are as follows: ShardingSphere-Proxy 5.0.1-Snapshot version and MySQL 5.7.26 version.

The machine configuration for the pressure measurement environment is as follows:

Sbtest1 ~sbtest10 shard tables are created by referring to sysbench table structure. Each shard table is divided into 5 libraries, and each library is divided into 10 tables.

schemaName: sbtest_sharding
dataSources:
  ds_0:
    url: jdbc:mysql://127.0.0.1:3306/sbtest?useSSL=false&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=8192&prepStmtCacheSqlLimit=1024
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 10000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
  ds_1:
    url: jdbc:mysql://127.0.0.1:3306/sbtest?useSSL=false&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=8192&prepStmtCacheSqlLimit=1024
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 10000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
  ds_2:
    url: jdbc:mysql://127.0.0.1:3306/sbtest?useSSL=false&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=8192&prepStmtCacheSqlLimit=1024
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 10000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
  ds_3:
    url: jdbc:mysql://127.0.0.1:3306/sbtest?useSSL=false&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=8192&prepStmtCacheSqlLimit=1024
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 10000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
  ds_4:
    url: jdbc:mysql://127.0.0.1:3306/sbtest?useSSL=false&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=8192&prepStmtCacheSqlLimit=1024
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 10000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1

rules:
- !SHARDING
  tables:
    sbtest1:
      actualDataNodes: ds_${0..4}.sbtest1_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_1
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest2:
      actualDataNodes: ds_${0..4}.sbtest2_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_2
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest3:
      actualDataNodes: ds_${0..4}.sbtest3_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_3
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest4:
      actualDataNodes: ds_${0..4}.sbtest4_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_4
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest5:
      actualDataNodes: ds_${0..4}.sbtest5_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_5
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest6:
      actualDataNodes: ds_${0..4}.sbtest6_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_6
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest7:
      actualDataNodes: ds_${0..4}.sbtest7_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_7
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest8:
      actualDataNodes: ds_${0..4}.sbtest8_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_8
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest9:
      actualDataNodes: ds_${0..4}.sbtest9_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_9
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake
    sbtest10:
      actualDataNodes: ds_${0..4}.sbtest10_${0..9}
      tableStrategy:
        standard:
          shardingColumn: id
          shardingAlgorithmName: table_inline_10
      keyGenerateStrategy:
        column: id
        keyGeneratorName: snowflake

  defaultDatabaseStrategy:
    standard:
      shardingColumn: id
      shardingAlgorithmName: database_inline

  shardingAlgorithms:
    database_inline:
      type: INLINE
      props:
        algorithm-expression: ds_${id % 5}
        allow-range-query-with-inline-sharding: true
    table_inline_1:
      type: INLINE
      props:
        algorithm-expression: sbtest1_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_2:
      type: INLINE
      props:
        algorithm-expression: sbtest2_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_3:
      type: INLINE
      props:
        algorithm-expression: sbtest3_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_4:
      type: INLINE
      props:
        algorithm-expression: sbtest4_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_5:
      type: INLINE
      props:
        algorithm-expression: sbtest5_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_6:
      type: INLINE
      props:
        algorithm-expression: sbtest6_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_7:
      type: INLINE
      props:
        algorithm-expression: sbtest7_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_8:
      type: INLINE
      props:
        algorithm-expression: sbtest8_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_9:
      type: INLINE
      props:
        algorithm-expression: sbtest9_${id % 10}
        allow-range-query-with-inline-sharding: true
    table_inline_10:
      type: INLINE
      props:
        algorithm-expression: sbtest10_${id % 10}
        allow-range-query-with-inline-sharding: true
  keyGenerators:
    snowflake:
      type: SNOWFLAKE
      props:
        worker-id: 123
Copy the code

We use the following JMH test procedures to test different cases:

@State(Scope.Thread) public class QueryOptimizationTest { private PreparedStatement unionAllForCaseOneStatement; private PreparedStatement unionAllForCaseTwoStatement; @Setup(Level.Trial) public void setup() throws Exception { Connection connection = DriverManager. GetConnection (" JDBC: mysql: / / 127.0.0.1:3307 / sharding_db? useSSL=false", "root", "123456"); // CASE 1 unionAllForCaseOneStatement = connection.prepareStatement("SELECT COUNT(k) AS countK FROM sbtest1 WHERE id < ? ;" ); // CASE 2 unionAllForCaseTwoStatement = connection.prepareStatement("SELECT SUM(k) AS sumK FROM sbtest1 WHERE id < ? ;" ); } @Benchmark public void testUnionAllForCaseOne() throws SQLException { unionAllForCaseOneStatement.setInt(1, 200); unionAllForCaseOneStatement.executeQuery(); } @Benchmark public void testUnionAllForCaseTwo() throws SQLException { unionAllForCaseTwoStatement.setInt(1, 200); unionAllForCaseTwoStatement.executeQuery(); }}Copy the code

Performance test will be a test to each CASE 3 groups, and then averaged, before switching to optimize the compiled version aab226b72ba574061748d8f94c461ea469f9168f packaging, the same average test 3 groups, the final performance test results are as follows.

Both CASE 1 and CASE 2 are tested based on the sysbench table structure with 1 million data. Due to the large number of test table fragments, the overall performance is improved about 4 times. Theoretically, the performance improvement will be more obvious with the increase of the number of fragments.

conclusion

Apache ShardingSphere 5.1.0 has carried out a large number of performance optimization, including comprehensive optimization and improvement for the protocol layer and kernel layer. This article is limited to the space to understand the SQL execution engine, and the subsequent series of articles will bring more professional and comprehensive performance optimization guide. I hope the interested students continue to pay attention. At the same time, we also welcome students from the community to actively participate in and jointly improve the performance of Apache ShardingSphere and provide better use experience for the community.

Reference documentation

1, ShardingSphere execution engine: shardingsphere.apache.org/document/cu…

2. ShardingSphere community discussion on execution engine connection mode: github.com/apache/shar…

3, MySQL UNION official documentation: dev.mysql.com/doc/refman/…

4, PostgreSQL UNION official documentation: www.postgresql.org/docs/14/sql…

5, Oracle, the UNION official documentation: docs.oracle.com/en/database…

6, SQL Server UNION official documentation: docs.microsoft.com/en-us/sql/t…