preface

This chapter examines the core step of ShardingJDBC: result merging.

A QueryResult,

public interface QueryResult {

    boolean next(a) throws SQLException;

    Object getValue(intcolumnIndex, Class<? > type) throws SQLException;

    Object getCalendarValue(intcolumnIndex, Class<? > type, Calendar calendar) throws SQLException;

    InputStream getInputStream(int columnIndex, String type) throws SQLException;

    boolean wasNull(a) throws SQLException;
 
    int getColumnCount(a) throws SQLException;

    String getColumnName(int columnIndex) throws SQLException;

    String getColumnLabel(int columnIndex) throws SQLException;
}
Copy the code

QueryResult is a wrapper around java.sql.ResultSet that provides the same next method as the ResultSet, and the other methods are easy to understand from their names.

Focus on the two implementations of QueryResult.

1, MemoryQueryResult

MemoryQueryResult corresponds to CONNECTION_STRICTLY connection restriction mode. When the number is greater than the maximum number of connections a single query of SQL (Max. Connections. The size, per query 1) by default. Memory merging is adopted to read the ResultSet ResultSet into memory at one time to prevent the use of too many database connections.

public final class MemoryQueryResult implements QueryResult {
    / / Java. SQL. The ResultSetMetaData ResultSet metadata information
    private final ResultSetMetaData resultSetMetaData;
    // Read a collection of rows in the ResultSet once
    private final Iterator<List<Object>> rows;
    / / the current line
    private List<Object> currentRow;
}
Copy the code

When the MemoryQueryResult is constructed, the entire ResultSet ResultSet is read into memory and placed into rows.

public MemoryQueryResult(final ResultSet resultSet) throws SQLException {
  resultSetMetaData = resultSet.getMetaData();
  rows = getRows(resultSet);
}

private Iterator<List<Object>> getRows(final ResultSet resultSet) throws SQLException {
  Collection<List<Object>> result = new LinkedList<>();
  while (resultSet.next()) {
      List<Object> rowData = new ArrayList<>(resultSet.getMetaData().getColumnCount());
      for (int columnIndex = 1; columnIndex <= resultSet.getMetaData().getColumnCount(); columnIndex++) {
          Object rowValue = getRowValue(resultSet, columnIndex);
          rowData.add(resultSet.wasNull() ? null : rowValue);
      }
      result.add(rowData);
  }
  return result.iterator();
}
Copy the code

Generally, the next method and getXXX method are used in the ResultSet, and the corresponding QueryResult is the Next method and getValue method.

private final Iterator<List<Object>> rows;
private List<Object> currentRow;
@Override
public boolean next(a) {
    // If there is still data, move currentRow to the next line and return true
    if (rows.hasNext()) {
        currentRow = rows.next();
        return true;
    }
    // No data returns false
    currentRow = null;
    return false;
}

@Override
public Object getValue(final int columnIndex, finalClass<? > type) {
    // Retrieves the value of the columnindex-1 subscript of the current row
    return currentRow.get(columnIndex - 1);
}
Copy the code

2, StreamQueryResult

StreamQueryResult corresponds to MEMORY_STRICTLY memory-restricted mode. When the quantity is less than or equal to a single SQL query maximum number of connections limit (Max. Connections. The size, per query 1) by default. Using streaming merge, move the ResultSet pointer to obtain data, reducing the memory overhead caused by reading the ResultSet at one time.

public final class StreamQueryResult implements QueryResult {
    // java.sql.ResultSetMetaData
    private final ResultSetMetaData resultSetMetaData;
    // java.sql.ResultSet
    private final ResultSet resultSet;
    public StreamQueryResult(final ResultSet resultSet) throws SQLException {
        resultSetMetaData = resultSet.getMetaData();
        this.resultSet = resultSet; }}Copy the code

StreamQueryResult simply encapsulates a ResultSet, and basically all implementations of QueryResult interface methods are delegated directly to a ResultSet.

@Override
public boolean next(a) throws SQLException {
  return resultSet.next();
}

@Override
public Object getValue(final int columnIndex, finalClass<? > type) throws SQLException {
  if (boolean.class == type) {
      return resultSet.getBoolean(columnIndex);
  } else if (byte.class == type) {
      return resultSet.getByte(columnIndex);
  }
  // ...
}
Copy the code

Select QueryResult implementation according to ConnectionMode

As discussed in the previous chapter, ConnectionMode is used to select the QueryResult implementation.

If maxConnectionsSizePerQuery (single query maximum number of connections) < SQL number, use CONNECTION_STRICTLY link restrictions. Memory merging is adopted to read ResultSet data into memory at a time to reduce the overhead of database connection. If maxConnectionsSizePerQuery (single query maximum number of connections) > = number of SQL, using MEMORY_STRICTLY memory limit. Using streaming merge, ResultSet moves cursor to read data into memory, reducing memory overhead.

If the user calls ResultSet ResultSet = Statement.executeQuery () to execute the SQL, it will be ok to select the QueryResult implementation based on ConnectionMode. See PreparedStatementExecutor# executeQuery.

public List<QueryResult> executeQuery(a) throws SQLException {
  final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
  // callback
  SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
      @Override
      protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
          returngetQueryResult(statement, connectionMode); }};return executeCallback(executeCallback);
}

private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
  PreparedStatement preparedStatement = (PreparedStatement) statement;
  ResultSet resultSet = preparedStatement.executeQuery();
  getResultSets().add(resultSet);
  // Select QueryResult implementation according to ConnectionMode
  return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
Copy the code

If the user calls Statement. getResultSet, it looks like this.

statement.execute();
ResultSet resultSet = statement.getResultSet();
Copy the code

QueryResult implementation will choose streaming merge StreamQueryResult, see code ShardingPreparedStatement# getResultSet.

private ExecutionContext executionContext;
// ShardingResultSet
private ResultSet currentResultSet;
@Override
public ResultSet getResultSet(a) throws SQLException {
  if (null! = currentResultSet) {return currentResultSet;
  }
  if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
      // Execute getResultSet on each statement to obtain the result set
      List<ResultSet> resultSets = getResultSets();
      // Convert all resultSets to QueryResult
      List<QueryResult> queryResults = getQueryResults(resultSets);
      / / MergeEngine execution
      MergedResult mergedResult = mergeQuery(queryResults);
      // Generate a ShardingResultSet return
      currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
  }
  return currentResultSet;
}
Copy the code

The getQueryResults method directly new the StreamQueryResult.

private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
  List<QueryResult> result = new ArrayList<>(resultSets.size());
  for (ResultSet each : resultSets) {
      if (null! = each) { result.add(newStreamQueryResult(each)); }}return result;
}
Copy the code

Third, MergedResult

The result merge is ultimately handled by MergeEngine with the QueryResult collection as its input and MergedResult as its output. The methods of MergedResult are mostly similar to QueryResult, except that MergedResult represents the result after the merger.

public interface MergedResult {
    
    boolean next(a) throws SQLException;
    
    Object getValue(intcolumnIndex, Class<? > type) throws SQLException;
    
    Object getCalendarValue(intcolumnIndex, Class<? > type, Calendar calendar) throws SQLException;
    
    InputStream getInputStream(int columnIndex, String type) throws SQLException;
    
    boolean wasNull(a) throws SQLException;
}
Copy the code

MergedResult varies, and here are four representative abstract implementations.

1, DecoratorMergedResult

@RequiredArgsConstructor
@Getter
public abstract class DecoratorMergedResult implements MergedResult {
    // Merge the result
    private final MergedResult mergedResult;
    @Override
    public final Object getValue(final int columnIndex, finalClass<? > type) throws SQLException {
        return mergedResult.getValue(columnIndex, type);
    }
    @Override
    public final Object getCalendarValue(final int columnIndex, finalClass<? > type,final Calendar calendar) throws SQLException {
        return mergedResult.getCalendarValue(columnIndex, type, calendar);
    }
    @Override
    public final InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
        return mergedResult.getInputStream(columnIndex, type);
    }
    @Override
    public final boolean wasNull(a) throws SQLException {
        returnmergedResult.wasNull(); }}Copy the code

DecoratorMergedResult ideas, similar ShardingCartesianRoutingEngine cartesian product routing engine. It will only be used by the construct after the result of the merge, to decorate a MergedResult. Four methods are implemented by default from the last merge result, and subclasses need to implement the key Next method.

DecoratorMergedResult has three implementation class, are associated with paging, see the realization of the MySQL LimitDecoratorMergedResult here.

public final class LimitDecoratorMergedResult extends DecoratorMergedResult {
    // The paging context is retrieved from SelectStatementContext
    private final PaginationContext pagination;
    // Whether to skip all results
    private final boolean skipAll;
    // The current number of lines
    private int rowNumber;
    / / structure
    public LimitDecoratorMergedResult(final MergedResult mergedResult, final PaginationContext pagination) throws SQLException {
        super(mergedResult);
        this.pagination = pagination;
        // Calculates whether all lines need to be skipped
        skipAll = skipOffset();
    }
    
    private boolean skipOffset(a) throws SQLException {
        // If the amount of data in the result set is less than offset, return false
        for (int i = 0; i < pagination.getActualOffset(); i++) {
            if(! getMergedResult().next()) {return true; }}// Otherwise returns true and the current line is set to 0
        rowNumber = 0;
        return false;
    }
    
    @Override
    public boolean next(a) throws SQLException {
        // If the amount of data in the result set is insufficient for offset, return false
        if (skipAll) {
            return false;
        }
        // If rowCount does not exist, return decorated mergedresult.next
        if(! pagination.getActualRowCount().isPresent()) {return getMergedResult().next();
        }
        // Return that the current row is less than rowCount and mergedresult.next
        return++rowNumber <= pagination.getActualRowCount().get() && getMergedResult().next(); }}Copy the code

Select * from t_order limit 20,10, skipAll=true; select * from t_order limit 20,10, skipAll=true; If the rowNumber is less than the number of paging rows (10 data items), return the next method that wraps MergeResult.

2, TransparentMergedResult

TransparentMergedResult implements all MergedResult methods that need to be implemented directly by delegating them to QueryResult with the same method name and the code is not posted. Adapter pattern, similar to FutureTask holding Callable implementation Runnable.

3, MemoryMergedResult

MemoryMergedResult is similar to MemoryQueryResult in that it reads the input data completely into a collection in memory, and then provides it to external next and GET methods to iterate over the elements.

public abstract class MemoryMergedResult<T extends BaseRule> implements MergedResult {
    / / the cursor
    private final Iterator<MemoryQueryResultRow> memoryResultSetRows;
    / / the current line
    private MemoryQueryResultRow currentResultSetRow;
    // Whether it is empty
    private boolean wasNull;
    
    protected MemoryMergedResult(final T rule, final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
        // Subclasses implement the init method to construct the MemoryQueryResultRow collection
        List<MemoryQueryResultRow> memoryQueryResultRowList = init(rule, schemaMetaData, sqlStatementContext, queryResults);
        memoryResultSetRows = memoryQueryResultRowList.iterator();
        if(! memoryQueryResultRowList.isEmpty()) { currentResultSetRow = memoryQueryResultRowList.get(0); }}}Copy the code

The MemoryMergedResult parent class implements MergedResult interface methods such as Next through memoryResultSetRows iterators and memoryResultSetRows current row.

@Override
public final boolean next(a) {
    if (memoryResultSetRows.hasNext()) {
        currentResultSetRow = memoryResultSetRows.next();
        return true;
    }
    return false;
}

@Override
public final Object getValue(final int columnIndex, finalClass<? > type) throws SQLException {
    if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {
        throw new SQLFeatureNotSupportedException();
    }
    Object result = currentResultSetRow.getCell(columnIndex);
    wasNull = null == result;
    return result;
}
Copy the code

The subclass simply needs to provide an implementation of the init method that returns a MemoryQueryResultRow collection.

MemoryQueryResultRow is constructed to load data from QueryResult into the data array. This is what distinguishes MemoryMergedResult from StreamMergedResult. It reads all the data into memory for processing.

In addition, MemoryQueryResultRow provides get and set methods to modify data arrays.

public final class MemoryQueryResultRow {
    
    private final Object[] data;
    
    public MemoryQueryResultRow(final QueryResult queryResult) throws SQLException {
        data = load(queryResult);
    }
    
    private Object[] load(final QueryResult queryResult) throws SQLException {
        int columnCount = queryResult.getColumnCount();
        Object[] result = new Object[columnCount];
        for (int i = 0; i < columnCount; i++) {
            result[i] = queryResult.getValue(i + 1, Object.class);
        }
        return result;
    }
    public Object getCell(final int columnIndex) {
        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);
        return data[columnIndex - 1];
    }
    public void setCell(final int columnIndex, final Object value) {
        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);
        data[columnIndex - 1] = value; }}Copy the code

MemoryMergedResult implementation class GroupByMemoryMergedResult processing groupby group aggregation logic. Look at its init method.

@Override
protected List<MemoryQueryResultRow> init(final ShardingRule shardingRule,
                                        final SchemaMetaData schemaMetaData, 
                                        final SQLStatementContext sqlStatementContext, 
                                        final List<QueryResult> queryResults) throws SQLException {
  SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
  // Single groupby value - merge result set
  Map<GroupByValue, MemoryQueryResultRow> dataMap = new HashMap<>(1024);
  // Single groupby value - aggregate condition (e.g. Sum, AVg, count) - aggregate result
  Map<GroupByValue, Map<AggregationProjection, AggregationUnit>> aggregationMap = new HashMap<>(1024);
  for (QueryResult each : queryResults) {
      while (each.next()) {
          // Construct GroupByValue by retrieving the value of groupby from each row
          GroupByValue groupByValue = new GroupByValue(each, selectStatementContext.getGroupByContext().getItems());
          // Initialize the first KV pair corresponding to the dataMap and aggregationMap
          initForFirstGroupByValue(selectStatementContext, each, groupByValue, dataMap, aggregationMap);
          // Merge calls AggregationUnit to perform the merge operation, and the result exists temporarily with AggregationUnitaggregate(selectStatementContext, each, groupByValue, aggregationMap); }}// Update the data calculated by aggMap AggregationUnit to dataMap's MemoryQueryResultRow
  setAggregationValueToMemoryRow(selectStatementContext, dataMap, aggregationMap);
  // Select * from table where metadata is used to determine whether the columns are case sensitive
  List<Boolean> valueCaseSensitive = queryResults.isEmpty() ? Collections.emptyList() : getValueCaseSensitive(queryResults.iterator().next(), selectStatementContext, schemaMetaData);
  // Sort dataMap's MemoryQueryResultRow and return it
  return getMemoryResultSetRows(selectStatementContext, dataMap, valueCaseSensitive);
}
Copy the code

Here’s an example: select user_id, date_format(created_at, ‘%Y-%m’), avg(paid_amount) from t_order group by user_id, Date_format (created_AT,’%Y-%m’) Collects the average amount paid by a user per month. (here SelectStatementContext# isSameGroupByAndOrderByItems returns true, actually not walk GroupByMemoryMergedResult, GroupByStreamMergedResult will go, But the logic is the same.)

For each user_id and month (GroupByValue), aggregate the sum paid, and then AggregationProjection&AggregationUnit.

GroupByValue Identifies the value of the groupby field based on the groupby field and uses it as the aggregation dimension for each row of data. For example, if the groupby field is user_id and the row’s user_id is 1, then the groupValues list is [1].

@Getter
@EqualsAndHashCode // Overrides the equals&hasCode method as the Map Key
public final class GroupByValue {
    // List of groupby values such as user_id=1, month =2020-12
    private finalList<? > groupValues;public GroupByValue(final QueryResult queryResult, final Collection<OrderByItem> groupByItems) throws SQLException {
        groupValues = getGroupByValues(queryResult, groupByItems);
    }
    // Add the value of groupby to groupValues according to the subscript of the select field where groupby resides
    privateList<? > getGroupByValues(final QueryResult queryResult, final Collection<OrderByItem> groupByItems) throws SQLException {
        List<Object> result = new ArrayList<>(groupByItems.size());
        for (OrderByItem each : groupByItems) {
            result.add(queryResult.getValue(each.getIndex(), Object.class));
        }
        returnresult; }}Copy the code

AggregationProjection represents a polymerization field of a select statement that stores the alias, aggregate type, the expression in the aggregation function, derivative polymerization field (avg average ProjectionsTokenGenerator generates the sum and the count two aggregation query). The groupby field corresponds to the subscript position of the SELECT field.

@EqualsAndHashCode
@ToString
public class AggregationProjection implements Projection {
    // AVG
    private final AggregationType type;
    // (paid_amount)
    private final String innerExpression;
    // null
    private final String alias;
    SUM(paid_amount), COUNT(*)
    private final List<AggregationProjection> derivedAggregationProjections = new ArrayList<>(2);
    // The groupby field corresponds to the subscript of the SELECT field
    @Setter
    private int index = -1;
}
Copy the code

AggregationUnit is responsible for handling different aggregation logic and is created uniformly by the factory.

public final class AggregationUnitFactory {
    public static AggregationUnit create(final AggregationType type, final boolean isDistinct) {
        switch (type) {
            case MAX:
                return new ComparableAggregationUnit(false);
            case MIN:
                return new ComparableAggregationUnit(true);
            case SUM:
                return isDistinct ? new DistinctSumAggregationUnit() : new AccumulationAggregationUnit();
            case COUNT:
                return isDistinct ? new DistinctCountAggregationUnit() : new AccumulationAggregationUnit();
            case AVG:
                return isDistinct ? new DistinctAverageAggregationUnit() : new AverageAggregationUnit();
            default:
                throw newUnsupportedOperationException(type.name()); }}}Copy the code

For example, AverageAggregationUnit handles common AVG logic, merge merge methods, add count and sum; The getResult method returns sum/count.

@RequiredArgsConstructor
public final class AverageAggregationUnit implements AggregationUnit {
    
    private BigDecimal count;
    
    private BigDecimal sum;
    
    @Override
    public void merge(finalList<Comparable<? >> values) {
        if (null == values || null == values.get(0) | |null == values.get(1)) {
            return;
        }
        if (null == count) {
            count = new BigDecimal("0");
        }
        if (null == sum) {
            sum = new BigDecimal("0");
        }
        count = count.add(new BigDecimal(values.get(0).toString()));
        sum = sum.add(new BigDecimal(values.get(1).toString()));
    }
    
    @Override
    publicComparable<? > getResult() {if (null == count || BigDecimal.ZERO.equals(count)) {
            return count;
        }
        // TODO use metadata to fetch float number precise for database field
        return sum.divide(count, 4, BigDecimal.ROUND_HALF_UP); }}Copy the code

4, StreamMergedResult

StreamMergedResult streaming merge as a result, the implementation method, aside from next subclasses need to implement the next method to move the cursor, set currentQueryResult (current QueryResult).

@Setter
public abstract class StreamMergedResult implements MergedResult {
    // Current result set
    private QueryResult currentQueryResult;
    // Whether it is empty
    private boolean wasNull;
    
    protected final QueryResult getCurrentQueryResult(a) throws SQLException {
        return currentQueryResult;
    }
    @Override
    public Object getValue(final int columnIndex, finalClass<? > type) throws SQLException {
        Object result = getCurrentQueryResult().getValue(columnIndex, type);
        wasNull = getCurrentQueryResult().wasNull();
        return result;
    }
    @Override
    public Object getCalendarValue(final int columnIndex, finalClass<? > type,final Calendar calendar) throws SQLException {
        Object result = getCurrentQueryResult().getCalendarValue(columnIndex, type, calendar);
        wasNull = getCurrentQueryResult().wasNull();
        return result;
    }
    @Override
    public final InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
        InputStream result = getCurrentQueryResult().getInputStream(columnIndex, type);
        wasNull = getCurrentQueryResult().wasNull();
        return result;
    }
    @Override
    public final boolean wasNull(a) {
        returnwasNull; }}Copy the code

StreamMergedResult has three implementation classes

  • IteratorStreamMergedResult: ordinary merge.
  • OrderByStreamMergedResult: merge sort.
  • Merge GroupByStreamMergedResult: group aggregation.

The GroupByStreamMergedResult like GroupByMemoryMergedResult, went to watch.

IteratorStreamMergedResult

IteratorStreamMergedResult streaming merger results for dealing with common scenarios. For example, select * from T_ORDER requires merging data from different data sources and different tables.

public final class IteratorStreamMergedResult extends StreamMergedResult {
    
    private final Iterator<QueryResult> queryResults;
    
    public IteratorStreamMergedResult(final List<QueryResult> queryResults) {
        this.queryResults = queryResults.iterator();
        setCurrentQueryResult(this.queryResults.next());
    }
    
    @Override
    public boolean next(a) throws SQLException {
        if (getCurrentQueryResult().next()) {
            return true;
        }
        if(! queryResults.hasNext()) {return false;
        }
        setCurrentQueryResult(queryResults.next());
        boolean hasNext = getCurrentQueryResult().next();
        if (hasNext) {
            return true;
        }
        while(! hasNext && queryResults.hasNext()) { setCurrentQueryResult(queryResults.next()); hasNext = getCurrentQueryResult().next(); }returnhasNext; }}Copy the code

Note that QueryResult iterates differently from java.util.Iterator, where the next method moves the cursor and determines whether there is data; The latter determines whether there is still data using the hasNext method, and the next method moves the cursor.

Next method that first determines whether the current QueryResult has any data, and if not, moves to the next QueryResult via Iterator.

OrderByStreamMergedResult

OrderByStreamMergedResult processing sort merge streaming merger as a result, the thought is how to get the local orderly merge to the global order. The following is OrderByStreamMergedResult three member variables.

public class OrderByStreamMergedResult extends StreamMergedResult {
    // The OrDerby information obtained in SelectStatementContext
    private final Collection<OrderByItem> orderByItems;
    // The element is the priority queue of OrderByValue
    private final Queue<OrderByValue> orderByValuesQueue;
    // Initialize the flag bit of the next method for the first time to true
    private boolean isFirstNext;
}
Copy the code

OrderByValuesQueue OrderByStreamMergedResult constructor initializes the priority queue.

public OrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
  this.orderByItems = selectStatementContext.getOrderByContext().getItems();
  this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
  orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
  isFirstNext = true;
}

private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
  for (QueryResult each : queryResults) {
      // OrderByValue holds QueryResult and can move the QueryResult cursor
      OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
      // Move the QueryResult cursor and determine if there is data
      if (orderByValue.next()) {
          // Put the priority queueorderByValuesQueue.offer(orderByValue); }}If none of the queryResults has data, the priority queue will be empty and the first QueryResult will be returned, otherwise the first QueryResult in the priority queue will be returned
  setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
Copy the code

Let’s see what OrderByValue is, it’s similar to GroupByValue, except GroupByValue doesn’t operate on a cursor, whereas OrderByValue holds QueryResult (ResultSet).

public final class OrderByValue implements Comparable<OrderByValue> {
    // Hold QueryResult
    @Getter
    private final QueryResult queryResult;
    // The OrDerby information obtained in SelectStatementContext
    private final Collection<OrderByItem> orderByItems;
    // Is each Order Derby field case-sensitive if it is a string (for sorting comparisons)
    private final List<Boolean> orderValuesCaseSensitive;
    // Same as GroupByValue#groupValues, representing the list of select field values corresponding to the list of Order Derby fields in the current row
    privateList<Comparable<? >> orderValues; }Copy the code

Focus on the OrderByValue methods. The first is the Next method, which moves the QueryResult cursor and updates the current orderValues.

public boolean next(a) throws SQLException {
  // Move the QueryResult cursor and determine if there is data
  boolean result = queryResult.next();
  // Update the list of values orderValues corresponding to the list of Order Derby fields in the current row if there is data, otherwise update to an empty set
  orderValues = result ? getOrderValues() : Collections.emptyList();
  // Returns whether there is data in QueryResult
  return result;
}
Copy the code

The second is the compareTo method, which is used for priority queue sorting, to loop through each Order Derby field and compare the order Derby field values based on sorting rules and string case sensitivity.

@Override
public int compareTo(final OrderByValue o) {
    int i = 0;
    for (OrderByItem each : orderByItems) {
        int result = CompareUtil.compareTo(orderValues.get(i), o.orderValues.get(i), each.getSegment().getOrderDirection(),
            each.getSegment().getNullOrderDirection(), orderValuesCaseSensitive.get(i));
        if (0! = result) {return result;
        }
        i++;
    }
    return 0;
}
Copy the code

Back to OrderByStreamMergedResult, after constructor orderByValuesQueue priority queue has been constructed, the queue’s head is the global ranking the top row data corresponding to the corresponding OrderByValue QueryResult. Then look at the next OrderByStreamMergedResult method, logic is written in the annotations.

// The element is the priority queue of OrderByValue
private final Queue<OrderByValue> orderByValuesQueue;
// Initialize the flag bit of the next method for the first time to true
private boolean isFirstNext;
@Override
public boolean next(a) throws SQLException {
  If the queue is empty, return false
  if (orderByValuesQueue.isEmpty()) {
      return false;
  }
  // If the next method is executed the first time, true is returned
  if (isFirstNext) {
      isFirstNext = false;
      return true;
  }
  // Get the first OrderByValue of the queue
  OrderByValue firstOrderByValue = orderByValuesQueue.poll();
  // Determine if there are still data moves to the QueryResult pointer
  if (firstOrderByValue.next()) {
      // If so, requeue OrderByValue
      orderByValuesQueue.offer(firstOrderByValue);
  }
  // Return false if the queue is empty
  if (orderByValuesQueue.isEmpty()) {
      return false;
  }
  // Set the current QueryResult to the QueryResult held by the first OrderByValue in the queue and return true
  setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
  return true;
}
Copy the code

Select * from t_order order by user_id ASC The first result set has two data sets user_id=1 and user_id=3. The second result set has two data sets user_id=2 and user_id=4.The constructor encapsulates QueryResult as OrderValue and places it in the priority queue. The head of the queue is the OrderByValue corresponding to the QueryResult corresponding to the highest row in the global order (user_id=1).The first execution OrderByStreamMergedResult next method, OrderByValue1 after ejection in the priority queue again lead to sort behind OrderByValue2. Set the current QueryResult to the QueryResult1 corresponding to OrderByValue1 to get the row where the current QueryResult points to user_id=1.Second perform OrderByStreamMergedResult next method, OrderByValue2 ejection in the priority queue again lead to sort after behind OrderByValue1. Set the current QueryResult to the QueryResult2 corresponding to OrderByValue2 to obtain the row where the current QueryResult points to user_id=2.Third perform OrderByStreamMergedResult next method, OrderByValue1 ejected after its corresponding QueryResult1 no more data, But OrderByValue1 will also be placed in the priority queue (because the next method of QueryResult determines that there is data before moving the pointer). Set the current QueryResult to the QueryResult1 corresponding to OrderByValue1 to get the row where the current QueryResult points to user_id=3.Fourth perform OrderByStreamMergedResult next method, pop-up OrderByValue1, but since there is no data will not be back on the queue. Set the current QueryResult to the QueryResult2 corresponding to OrderByValue2 in the queue header (PEEK), and get the row where the current QueryResult points to user_id=4.The last execution OrderByStreamMergedResult next method, pop-up OrderByValue2, but since there is no data will not be back on the queue. Finally determine the priority queue is empty and return false.

conclusion

1. Streaming merge? Memory merge?

  • If statement.executeQuery is executed to obtain a ResultSet, select this result using ConnectionMode.
    • If maxConnectionsSizePerQuery (single query maximum number of connections) < SQL number, use CONNECTION_STRICTLY link restrictions. MemoryQueryResult is merged in memory to read ResultSet data into memory at one time to reduce the overhead of database connection.
    • If maxConnectionsSizePerQuery (single query maximum number of connections) > = number of SQL, using MEMORY_STRICTLY memory limit. StreamQueryResult merges with StreamQueryResult. The ResultSet moves the cursor to read data into memory, reducing memory overhead.
  • If statement.getResultSet is executed to get a ResultSet, streaming merges StreamQueryResult.

2. QueryResult is processed by MergeEngine to obtain the merge result MergedResult.

3. MergedResult classification

  • ShardingCartesianRoutingEngine DecoratorMergedResult: similar to the cartesian product routing engine. It will only be used by the construct after the result of the merge, to decorate a MergedResult. There are three implementation classes, all related to paging.
  • TransparentMergedResult: Adapter pattern that delegates all MergedResult methods that need to be implemented directly to QueryResult implementations.
  • MemoryMergedResult: Memory merging, a method of iterating data completely into a memory collection provided to external elements. Subclasses GroupByMemoryMergedResult processing groupby group aggregation logic.
  • StreamMergedResult: StreamMergedResult, moving cursors over elements. There are three implementation classes:
    • IteratorStreamMergedResult: ordinary merge.
    • OrderByStreamMergedResult: merge sort.
    • Merge GroupByStreamMergedResult: group aggregation.