One, foreword

How to use streaming programming in MYbatis

Alibaba is looking for long-term Recruitment of Java R&D engineers p6, P7, P8 and other uncapped levels. If you are interested, please send your resume to me, indicating the department and working place you want to work in:[email protected]

MyBatisCursorItemReader in Mybaits

2.1 configuration

  • MyBatisCursorItemReader injection

<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="queryId"
        value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" />
</bean>
Copy the code

QueryId is the interface name in the mapper file.

  • Mapper XML Settings


image.png


FetchSize =”-2147483648″, integer. MIN_VALUE=-2147483648

2.2 the use of

static void testCursor1() throws UnexpectedInputException, ParseException, Exception { try { Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); AccsDeviceInfoDAOExample. CreateCriteria (). AndAppKeyEqualTo (" 12345 "). AndAppVersionEqualTo (" 5.7.2.4.5 ") .andPackageNameEqualTo("com.test.zlx"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); / / set parameters myMyBatisCursorItemReader. SetParameterValues (param); / / create a cursor myMyBatisCursorItemReader. Open (new -- an optional ExecutionContext ()); // Use cursor iteration to get each record Long count = 0L; AccsDeviceInfoDAO accsDeviceInfoDAO; while ((accsDeviceInfoDAO = myMyBatisCursorItemReader.read()) ! = null) { System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); ++count; System.out.println(count); } } catch (Exception e) { System.out.println("error:" + e.getLocalizedMessage()); } finally { // do some myMyBatisCursorItemReader.close(); }}Copy the code

2.3 Principles

  • The open function

The selectCursor function retrieves a session from the Session factory and calls the session’s selectCursor, which eventually calls

ConnectionImpl prepareStatement method:

public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException {
    return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE, DEFAULT_RESULT_SET_CONCURRENCY);
}
private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY;

private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;

Copy the code

Two of the three conditions are met, and the fetchSize we set ourselves tells mysql to create a streaming ResultSet. So where does fectchSize set that?


image.png

In the figure, 1 creates the prepareStatement and 2 sets the fetchSize.

Finally, the sqlQueryDirect method of MysqlIO is called to execute the concrete SQL and the result ResultSet is placed in JDBC4PrepardStatement.

  • Read function

The read function fetches data from a resultSet, first calling.next to determine if there is data, and then reading the data if there is. This is just like pure JDBC programming, except that the read function wraps it.

3. Use of ResultHandler in Mybatis

3.1 configuration

  • Mapper XML Settings


image.png


FetchSize =”-2147483648″, integer. MIN_VALUE=-2147483648

3.2 the use of

static void testCursor2() { SqlSession session = sqlSessionFactory.openSession(); Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); AccsDeviceInfoDAOExample. CreateCriteria (). AndAppKeyEqualTo (" 12345 "). AndAppVersionEqualTo (" 2 ") .andPackageNameEqualTo("com.hello.test"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); session.select("com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData", param, new ResultHandler() { @Override public void handleResult(ResultContext resultContext) { AccsDeviceInfoDAO accsDeviceInfoDAO = (AccsDeviceInfoDAO) resultContext.getResultObject(); System.out.println(resultContext.getResultCount()); System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); }}); }Copy the code

3.3 Principles

Similar to section 3, except that section 3 returns a cursor to operate on the ResultSet and lets the user iteratively retrieve the data, instead of internally retrieving the data item by item and calling the handler’s handleResult method to process it.

private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<? > resultHandler, RowBounds rowBounds, ResultMapping parentMapping) throws SQLException { DefaultResultContext<Object> resultContext = new DefaultResultContext<Object>(); skipRows(rsw.getResultSet(), rowBounds); while (shouldProcessMoreRows(resultContext, rowBounds) && rsw.getResultSet().next()) { ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(rsw.getResultSet(), resultMap, null); Object rowValue = getRowValue(rsw, discriminatedResultMap); storeObject(resultHandler, resultContext, rowValue, parentMapping, rsw.getResultSet()); } } private void storeObject(ResultHandler<? > resultHandler, DefaultResultContext<Object> resultContext, Object rowValue, ResultMapping parentMapping, ResultSet rs) throws SQLException { if (parentMapping ! = null) { linkToParents(rs, parentMapping, rowValue); } else { callResultHandler(resultHandler, resultContext, rowValue); } // Call @suppressWarnings ("unchecked" /* Because ResultHandler<? > is always ResultHandler<Object>*/) private void callResultHandler(ResultHandler<? > resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) { resultContext.nextResultObject(rowValue); ((ResultHandler<Object>) resultHandler).handleResult(resultContext); }Copy the code

Iv. Summary and comparison of results

Flow programming using bare JDBC programming is the most simple and flexible, but THE SQL statement needs to be written to the place where the database operation needs to be called, which is not easy to maintain. Mybatis still uses bare JDBC programming API at the bottom, and uses XML files to manage SQL statements uniformly. Although there is some overhead in parsing execution (such as reflection for each call), caching is also provided.

Comparison of memory usage between using cursors and not using cursors when the search results are 6 million records under the same conditions:

  • The streaming


    image.png

  • streaming


    Paste the image.png

Shows the flow when memory can grow as find records and nearly linear growth, when flow is more stable, in addition the flow due to the mysql server to all the data, so the call will not immediately after the return, need according to different data amount will wait for a period of time will return, by this time the caller thread will be blocked, Streaming is fast because it returns one record at a time.

Here is a summary: When the client sends a SELECT request to the Server, the Server filters the records that meet the criteria. The Server then sends the records to its own send buffer, which flushes the cache when the client’s receive buffer is full. The Server’s send blocks the master until the client’s receive cache is idle. When the cursor is not in use, MySqIo reads records one by one from the receive cache into the ResultSet. In this way, the client reads data from its own receive cache into the ResultSet, and the Server continuously sends data to the Client’s receive cache across the network until all records are placed in the ResultSet.

If cursor is used, the frequency at which the user calls next in the ResultSet determines the blocking condition when the Server sends. If the user calls Next quickly, the client’s receive cache will be idle, and the Server will send the data. If the user calls next slowly, the Server will not be able to free up the receive cache. The sending of the Server is blocked

To obtain more ATA dry goods, please pay attention to the wechat public number: ‘Original accumulation of technology’


image.png