This article is the way to learn asynchronous programming (6) – Through Future and Callable data batch processing, to pay attention to the above, please click portal:
Learning the way to asynchronous programming (5) – Thread pool principles and use
In the previous section, we explained how to implement multi-threading through Thread and Runnable. If Thread A and Thread B cooperate, we need to use the communication methods of shared variables and threads, such as wait(), notify(), notifyAll(), etc. The main reason for using these communication methods is that neither Thread nor Runnable can get results back, whereas since Java 1.5, Callable and Future have been provided to get results after a task has completed.
This article introduces the principle and use of Future and Callable.
Future source code analysis
Futrue mode: For multithreading, if thread A waits for the result of thread B, then thread A does not need to wait for the result of thread B. It can get A Future first, and then get the real result after the result of thread B.
An example often cited in multi-threading is: the download of network pictures, at the beginning of the blurred picture to replace the last picture, such as the download picture thread after downloading the picture in the replacement. And there are other things you can do along the way.
The Future interface provides five methods for developers to operate with the following code:
1. Cancel
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
Copy the code
The cancel method attempts to cancel the execution of this task. The attempt fails if the task is completed, canceled, or cannot be canceled for some other reason, and returns true if canceled and false otherwise.
2, isCancelled (determine whether the mission has been cancelled)
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled(a);
Copy the code
The isCancelled method determines whether the person has been cancelled, returning true if cancelled and false otherwise.
3, isDone (check whether the task is completed)
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone(a);
Copy the code
The isDone method is used to determine whether the task is complete or not. If the task was aborted, thrown, or canceled, it returns true. False returns false.
4, get (get thread return result)
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get(a) throws InterruptedException, ExecutionException;
Copy the code
This method blocks execution of the main thread until the Future returns. This method may cause the program to die. It is generally recommended to use another overloaded method of GET.
5. Get (same function as above, timeout can be set)
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
Copy the code
An overloaded method of GET that blocks the main thread for a given timeout and waits for a Future result to return, or returns null if it does not exist.
Two, Callable source code analysis
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
* <p>The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call(a) throws Exception;
}
Copy the code
The result returned by the Callable interface may throw an exception, and the inheritor defines a singleton method with no arguments called call. The Callable interface is similar to the Runnable interface in that both are designed for classes with instances that might be executed by other threads. Runnable does not return results or throw exceptions, whereas Callable does.
Future batch processing data into the database
Now there is such A business scenario, we need to replace the ids of provinces and cities queried in Table A with specific names and put them into the database in batches. Our implementation ideas are as follows:
- First of all, we directly take the data in the table of local provinces and cities, and process the data according to the three levels of provinces and cities to get the result M.
- When the result M is queried, related information of table A is queried through the main thread.
- Then we need to use the result M to replace the provincial id in table A with the specific district name.
- In the process of replacement, we also monitored the quantity after its replacement, and put it into the database in batches of 50 pieces.
1, ThreadPoolTest
/ * * *@Description: Batch processing of data through Future and Callable *@Author: zhangzhixiang *@CreateDate: 2018/12/25 *@Version: 1.0 * /
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ThreadPoolTest {
/** **
*/
,>
public static Map<Integer, String> provinceMap = Maps.newHashMap();
public static Map<Integer, String> cityMap = Maps.newHashMap();
public static Map<Integer, String> areaMap = Maps.newHashMap();
@Autowired
private ProvinceDAO provinceDAO;
@Autowired
private ClueInfoDAO clueInfoDAO;
@Test
public void test(a) throws Exception {
// define a thread pool
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardPolicy());
//2. Obtain provincial data asynchronously
Callable<List<ProvinceDO>> provinceCallable = () -> {
List<ProvinceDO> provinceDoList = provinceDAO.getProvince();
for(ProvinceDO data : provinceDoList) {
if (data.getLevel() == 1) {
provinceMap.put(data.getId(), data.getAreaName());
}
if (data.getLevel() == 2) {
cityMap.put(data.getId(), data.getAreaName());
}
if (data.getLevel() == 3) { areaMap.put(data.getId(), data.getAreaName()); }}return null;
};
//3, obtain the province Future value
Future<List<ProvinceDO>> provinceFuture = threadPoolExecutor.submit(provinceCallable);
// select * from cue table
Callable<List<ClueInfoDO>> clueCallable = () -> clueInfoDAO.selectByCondition(null);
//5
Future<List<ClueInfoDO>> clueFuture = threadPoolExecutor.submit(clueCallable);
//6. Assemble and replace provinces and cities
List<ClueInfoDO> realClueList = clueFuture.get(15, TimeUnit.SECONDS);
if (provinceFuture.isDone()) {
List<Arrangement> arrangementList = Lists.newArrayList();
for (ClueInfoDO clueInfoDO : realClueList) {
Arrangement arrangement = new Arrangement();
arrangement.setClueName(clueInfoDO.getName());
if(clueInfoDO.getProvince() ! =null) {
arrangement.setProvinceName(provinceMap.get(clueInfoDO.getProvince().getId()));
}
if(clueInfoDO.getCity() ! =null) {
arrangement.setCityName(cityMap.get(clueInfoDO.getCity().getId()));
}
if(clueInfoDO.getArea() ! =null) {
arrangement.setAreaName(areaMap.get(clueInfoDO.getArea().getId()));
}
arrangementList.add(arrangement);
}
int runSize = 50;
int handleSize = arrangementList.size() / runSize;
try {
List<Arrangement> newList = null;
CountDownLatch countDownLatch = new CountDownLatch(runSize);
for (int i = 0; i < runSize + 1; i++) {
if (i == runSize) {
int startIndex = i * handleSize;
int endIndex = arrangementList.subList(startIndex, endIndex);
newList = arrangementList.subList(startIndex, endIndex);
} else {
int startIndex = i * handleSize;
int endIndex = (i + 1) * handleSize;
newList = arrangementList.subList(startIndex, endIndex);
}
threadPoolExecutor.execute(new ArrangementRunnable(newList, countDownLatch));
}
countDownLatch.await();
threadPoolExecutor.shutdown();
} catch(Exception e) { e.printStackTrace(); }}}}Copy the code
2, ArrangementRunnable
/ * * *@Description: warehousing *@Author: zhangzhixiang *@CreateDate: 2018/12/25 *@Version: 1.0 * /
public class ArrangementRunnable implements Runnable {
private List<Arrangement> list;
private CountDownLatch countDownLatch;
public ArrangementRunnable(Lise<Arrangement> list, CountDownLatch countDownLatch) {
this.list = list;
this.countDownLatch = countDownLatch;
}
@Override
public void run(a) {
if(list ! =null) {
try {
ArrangementDAO dao = SpringHelper.getBeanByClass(ArrangementDAO.class);
dao.batchIntset(list);
} catch (Exception e) {
e.printStackTrace();
} finally{ countDownLatch.countDown(); }}}}Copy the code
Spring help classes
/ * * *@Description: Spring help class *@Author: zhangzhixaing *@CreateDate: 2018/08/31 16:39:45 *@Version: 1.0 * /
@Component
public class SpringHelper implements ApplicationContextAware {
private static ApplicationCOntext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringHelper.applicationContext == null) { SpringHelper.applicationContext = applicationContext; }}/** * Get the corresponding bean */ in the configuration file based on the id of a bean
public stativ Object getBean(String beanId) throws BeansException {
if(applicationContext.containsBean(beanId)) {
applicationContext.getBean(beanId);
}
return null;
}
/** * get the corresponding bean */ in the configuration file based on the type of a bean
public static <T> T getBeanByClass(Class<T> requiredType) throws BeansException {
return applicationContext.getBean(requiredType);
}
/** * Returns true if the BeanFactory contains a definition of a bean that matches the given name, otherwise false */
public static boolean containsBean(String name) {
return applicationContext.containsBean(name);
}
/** * get the Spring container */
public static ApplicationContext getApplicationContext(a) {
returnSpringHelper.applicationContest; }}Copy the code
This article involves some daOs related to database, which I will not show you, as long as you know how Future and Calable work together and batch repository, the purpose of this article will be accomplished.
Note: You cannot use @Autowire for dependency injection in the Runnable implementation class. If you do not inject it, it will be null. Instead, I use the Spring helper class to instantiate the relevant DAO entities.
This concludes the article, which will start learning about the Lock synchronize Lock, which is considered a better alternative to the synchronize Lock.
Learning the path of asynchronous programming (7) – The principle and use of Lock