The merging engine seems to be quite complex. The result merging supported by ShardingSphere can be divided into five types, namely traversal, sorting, grouping, paging and aggregation, which are combined rather than mutually exclusive. From the structure, it can be divided into streaming merge, memory merge and decorator merge. Streaming and memory merges are mutually exclusive, and decorator merges can go further than streaming and memory merges.

We take an order by SQL statements go over the code, then the previous, we return to ShardingPreparedStatement# mergeQuery method, we see MergeEngine, start from here is belongs to merge the result set

private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
       ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
       MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
       return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
   }
Copy the code

1. The core code merge in ShardingDQLResultMerger# build, there is a loss of merge and memory nu merge, we went to see the streaming merge advanced OrderByStreamMergedResult method

private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        if (isNeedProcessGroupBy(selectStatementContext)) {
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessDistinctRow(selectStatementContext)) {
            setGroupByForDistinctRow(selectStatementContext);
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessOrderBy(selectStatementContext)) {
            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
        }
        return new IteratorStreamMergedResult(queryResults);
    }
Copy the code
  1. The PriorityQueue PriorityQueue is created to queue the result set in the orderResultSetsToQueue method
public OrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        this.orderByItems = selectStatementContext.getOrderByContext().getItems();
        this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
        orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
        isFirstNext = true;
    }
Copy the code
  1. The last line of code, streaming result set merge, sets the current streaming result set, which stores the result set of the first element in the current queue so it doesn’t run out of memory
private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        for (QueryResult each : queryResults) {
            OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
            if (orderByValue.next()) {
                orderByValuesQueue.offer(orderByValue);
            }
        }
        setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
    }
Copy the code

4. Take a look at memory merge and go to the MemoryMergedResult#MemoryMergedResult method. Here you can see that you need to load all the data into memory

protected MemoryMergedResult(final T rule, final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
        List<MemoryQueryResultRow> memoryQueryResultRowList = init(rule, schemaMetaData, sqlStatementContext, queryResults);
        memoryResultSetRows = memoryQueryResultRowList.iterator();
        if(! memoryQueryResultRowList.isEmpty()) { currentResultSetRow = memoryQueryResultRowList.get(0); }}Copy the code
  1. Finally look at the decoration merge, with page would be used, the following code, merge LimitDecoratorMergedResult is decoration
private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {
        PaginationContext paginationContext = selectStatementContext.getPaginationContext();
        if(! paginationContext.isHasPagination() ||1 == queryResults.size()) {
            return mergedResult;
        }
        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
            return new LimitDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("Oracle".equals(trunkDatabaseName)) {
            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("SQLServer".equals(trunkDatabaseName)) {
            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        return mergedResult;
    }
Copy the code