One, foreword
How to use streaming programming in MYbatis
Alibaba is looking for long-term Recruitment of Java R&D engineers p6, P7, P8 and other uncapped levels. If you are interested, please send your resume to me, indicating the department and working place you want to work in:[email protected]
MyBatisCursorItemReader in Mybaits
2.1 configuration
- MyBatisCursorItemReader injection
<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
<property name="sqlSessionFactory" ref="sqlSessionFactory" />
<property name="queryId"
value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" />
</bean>
Copy the code
QueryId is the interface name in the mapper file.
- Mapper XML Settings
image.png
FetchSize =”-2147483648″, integer. MIN_VALUE=-2147483648
2.2 the use of
static void testCursor1() throws UnexpectedInputException, ParseException, Exception { try { Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); AccsDeviceInfoDAOExample. CreateCriteria (). AndAppKeyEqualTo (" 12345 "). AndAppVersionEqualTo (" 5.7.2.4.5 ") .andPackageNameEqualTo("com.test.zlx"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); / / set parameters myMyBatisCursorItemReader. SetParameterValues (param); / / create a cursor myMyBatisCursorItemReader. Open (new -- an optional ExecutionContext ()); // Use cursor iteration to get each record Long count = 0L; AccsDeviceInfoDAO accsDeviceInfoDAO; while ((accsDeviceInfoDAO = myMyBatisCursorItemReader.read()) ! = null) { System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); ++count; System.out.println(count); } } catch (Exception e) { System.out.println("error:" + e.getLocalizedMessage()); } finally { // do some myMyBatisCursorItemReader.close(); }}Copy the code
2.3 Principles
- The open function
The selectCursor function retrieves a session from the Session factory and calls the session’s selectCursor, which eventually calls
ConnectionImpl prepareStatement method:
public java.sql.PreparedStatement prepareStatement(String sql) throws SQLException {
return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE, DEFAULT_RESULT_SET_CONCURRENCY);
}
private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY;
private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;
Copy the code
Two of the three conditions are met, and the fetchSize we set ourselves tells mysql to create a streaming ResultSet. So where does fectchSize set that?
image.png
In the figure, 1 creates the prepareStatement and 2 sets the fetchSize.
Finally, the sqlQueryDirect method of MysqlIO is called to execute the concrete SQL and the result ResultSet is placed in JDBC4PrepardStatement.
- Read function
The read function fetches data from a resultSet, first calling.next to determine if there is data, and then reading the data if there is. This is just like pure JDBC programming, except that the read function wraps it.
3. Use of ResultHandler in Mybatis
3.1 configuration
- Mapper XML Settings
image.png
FetchSize =”-2147483648″, integer. MIN_VALUE=-2147483648
3.2 the use of
static void testCursor2() { SqlSession session = sqlSessionFactory.openSession(); Map<String, Object> param = new HashMap<String, Object>(); AccsDeviceInfoDAOExample accsDeviceInfoDAOExample = new AccsDeviceInfoDAOExample(); AccsDeviceInfoDAOExample. CreateCriteria (). AndAppKeyEqualTo (" 12345 "). AndAppVersionEqualTo (" 2 ") .andPackageNameEqualTo("com.hello.test"); param.put("oredCriteria", accsDeviceInfoDAOExample.getOredCriteria()); session.select("com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData", param, new ResultHandler() { @Override public void handleResult(ResultContext resultContext) { AccsDeviceInfoDAO accsDeviceInfoDAO = (AccsDeviceInfoDAO) resultContext.getResultObject(); System.out.println(resultContext.getResultCount()); System.out.println(JSON.toJSONString(accsDeviceInfoDAO)); }}); }Copy the code
3.3 Principles
Similar to section 3, except that section 3 returns a cursor to operate on the ResultSet and lets the user iteratively retrieve the data, instead of internally retrieving the data item by item and calling the handler’s handleResult method to process it.
private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<? > resultHandler, RowBounds rowBounds, ResultMapping parentMapping) throws SQLException { DefaultResultContext<Object> resultContext = new DefaultResultContext<Object>(); skipRows(rsw.getResultSet(), rowBounds); while (shouldProcessMoreRows(resultContext, rowBounds) && rsw.getResultSet().next()) { ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(rsw.getResultSet(), resultMap, null); Object rowValue = getRowValue(rsw, discriminatedResultMap); storeObject(resultHandler, resultContext, rowValue, parentMapping, rsw.getResultSet()); } } private void storeObject(ResultHandler<? > resultHandler, DefaultResultContext<Object> resultContext, Object rowValue, ResultMapping parentMapping, ResultSet rs) throws SQLException { if (parentMapping ! = null) { linkToParents(rs, parentMapping, rowValue); } else { callResultHandler(resultHandler, resultContext, rowValue); } // Call @suppressWarnings ("unchecked" /* Because ResultHandler<? > is always ResultHandler<Object>*/) private void callResultHandler(ResultHandler<? > resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) { resultContext.nextResultObject(rowValue); ((ResultHandler<Object>) resultHandler).handleResult(resultContext); }Copy the code
Iv. Summary and comparison of results
Flow programming using bare JDBC programming is the most simple and flexible, but THE SQL statement needs to be written to the place where the database operation needs to be called, which is not easy to maintain. Mybatis still uses bare JDBC programming API at the bottom, and uses XML files to manage SQL statements uniformly. Although there is some overhead in parsing execution (such as reflection for each call), caching is also provided.
Comparison of memory usage between using cursors and not using cursors when the search results are 6 million records under the same conditions:
-
The streaming
image.png
-
streaming
Paste the image.png
Shows the flow when memory can grow as find records and nearly linear growth, when flow is more stable, in addition the flow due to the mysql server to all the data, so the call will not immediately after the return, need according to different data amount will wait for a period of time will return, by this time the caller thread will be blocked, Streaming is fast because it returns one record at a time.
Here is a summary: When the client sends a SELECT request to the Server, the Server filters the records that meet the criteria. The Server then sends the records to its own send buffer, which flushes the cache when the client’s receive buffer is full. The Server’s send blocks the master until the client’s receive cache is idle. When the cursor is not in use, MySqIo reads records one by one from the receive cache into the ResultSet. In this way, the client reads data from its own receive cache into the ResultSet, and the Server continuously sends data to the Client’s receive cache across the network until all records are placed in the ResultSet.
If cursor is used, the frequency at which the user calls next in the ResultSet determines the blocking condition when the Server sends. If the user calls Next quickly, the client’s receive cache will be idle, and the Server will send the data. If the user calls next slowly, the Server will not be able to free up the receive cache. The sending of the Server is blocked
To obtain more ATA dry goods, please pay attention to the wechat public number: ‘Original accumulation of technology’
image.png