The Bug that can’t be changed, the pretense that can’t be written. The public account Yang Zhengyou now focuses on the development of mobile basic platform, covering various knowledge fields such as audio and video, APM and information security. Only do the whole network the most Geek public number, welcome your attention! Wonderful content should not be missed ~
A preface.
Executors are a typical producer-consumer mode. Thread pool is the most widely used concurrency framework in Java, and almost all programs that need to execute tasks asynchronously or concurrently can use thread pool. A thread pool is a pool of threads. When a task needs to run, a thread is taken from the pool to execute it. When the task is executed, the thread is put back into the pool.
Let’s think about it for a second: instead of using a thread pool, creating a new thread for each task would be too expensive. We wanted to have a fixed number of threads to execute the 1,000 threads, so we didn’t have the overhead of repeatedly creating and destroying threads, so we introduced a thread pool. It addresses two business pain points:
- Problem 1: Creating threads repeatedly is expensive
- Problem two: Too many threads take up too much memory
So how does it solve these problems?
Use a small number of threads to avoid excessive memory usage, keep them all working, and you can perform tasks repeatedly to avoid life cycle wastage
There are three benefits to using thread pools properly during development.
-
First: reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
-
Second: improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.
-
Third: improve thread manageability. Threads are scarce resources. If created without limit, they consume system resources and degrade system stability. Thread pools can be used for uniform allocation, tuning, and monitoring.
Is there an optimized thread pool solution that can be uniformly allocated, tuned, and monitored? Today I will take you to write a simplified version of the thread download
Functional requirements are as follows:
- Support file download listening success, failure, pause, download progress callback
- Add tasks to the task queue without limit
- You can view the total file download size
- Support to view the downloaded size
- You can view the path of the downloaded target file
- You can locate the download failure cause based on the download error code
- Support cancel, pause, start download task
Key technologies
File downloaders are not very complicated, but you need to have a basic understanding of the Executor parameters. I have compiled a table below that describes each parameter in detail
2.1 Core Parameters of ThreadPoolExecutor
Parameter names | Parameters of the function |
---|---|
corePoolSize | The number of core threads in the thread pool that, when a task is submitted, creates a new thread to execute the task until the current number of threads equals corePoolSize; If the current number of threads is corePoolSize, further submitted tasks are stored in a blocking queue, waiting to be executed. If the thread pool’s prestartAllCoreThreads() method is executed, the thread pool creates and starts all core threads ahead of time. |
maximumPoolSize | The maximum number of threads allowed in the thread pool. If the current blocking queue is full and the task continues to be submitted, a new thread is created to execute the task, provided that the number of current threads is less than maximumPoolSize |
keepAliveTime | The lifetime of a thread when it is idle, that is, how long it continues to live when there are no tasks to execute. By default, this parameter is only useful if the number of threads is greater than corePoolSize |
threadFactory | * Set the default threadFactory name to * for each new thread by following the following path: Pool-digital-thread-number * Set the default threadFactory name to * For each new thread. |
handler | With this parameter you can customize the task rejection policy. If all the threads in the thread pool are busy and the work queue is full (if the work queue is bounded), the thread pool will reject the submitted task. As for the rejection policy, you can specify it with the handler parameter. |
TimeUnit | KeepAliveTime Unit of keepAliveTime |
2.1.1 RejectedExecutionHandler (Saturation policy)
The saturation strategy of the thread pool. When the blocking queue is full and there are no idle worker threads, if the task continues to be submitted, a policy must be adopted to process the task. The thread pool provides four strategies: CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy DiscardOldestPolicy
Ce read name | Ce reading effect |
---|---|
CallerRunsPolicy | Execute the task with the caller’s thread |
AbortPolicy | Will the default refusal strategies, throws RejectedExecutionException |
DiscardPolicy | Discards the first task in the blocking queue and executes the current task |
DiscardOldestPolicy | Discarding the oldest tasks means discarding the earliest tasks and adding new tasks to the work queue |
2.1.2 workQueue
WorkQueue must be BlockingQueue BlockingQueue. When the number of threads in the thread pool exceeds its corePoolSize, the thread enters the blocking queue and waits. With workQueue, thread pools implement blocking
So what is a blocking queue?
Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that fetches elements from the queue. Blocking queues are containers that producers use to store elements and consumers use to get them. Java provides the following blocking queues:
Blocking queue name | concept |
---|---|
ArrayBlockingQueue | A bounded blocking queue consisting of an array structure. |
LinkedBlockingQueue | A bounded blocking queue consisting of a linked list structure |
PriorityBlockingQueue | An unbounded blocking queue that supports priority sorting |
DelayQueue | An unbounded blocking queue implemented using a priority queue |
SynchronousQueue | A blocking queue that does not store elements |
LinkedTransferQueue | An unbounded blocking queue consisting of a linked list structure |
LinkedBlockingDeque | A two-way blocking queue consisting of a linked list structure |
How does blocking queue implement add, delete, change check?
And here’s the table for you:
2.2 How to Create a Thread Pool
There are five ways to create a thread pool: fixed length thread pool, cacheable thread pool, single thread pool, and long thread pool. The application scenarios are as follows:
2.2.1 Create a thread pool of fixed length, which can control the maximum number of concurrent threads. The exceeding threads will wait in the queue.
2.2.2 Dynamically create and close threads based on the number of concurrent requests required. Can reasonably use the CPU to perform concurrent operations on tasks, so it is suitable for time-consuming tasks.
- Notice that a ForkJoinPool object is returned.
2.2.3 Create a thread as required. Reuse an idle thread. Using cachedThreadPool for short cycle asynchronous request tasks improves performance. Threads in the pool that have not been used for 60 seconds are terminated and removed from the pool.
2.2.4 Creating a single thread pool
2.2.5. Create a thread pool of fixed length to support scheduled and periodic task execution.
2.3 ThreadPoolExecutor source code structure
2.4 How thread pools work
-
- If fewer threads are currently running than corePoolSize, a new thread is created to perform the task (note that this step requires a global lock)
-
- If the running thread is equal to or more than corePoolSize, the task is added to BlockingQueue.
-
- If the task cannot be added to the BlockingQueue (the queue is full), a new thread is created to process the task (note that this step requires a global lock).
-
- If you create a new thread will make the currently running thread beyond maximumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.
Overall: Inside the thread pool, we maintain a blocking queue, the workQueue, and a set of worker threads, the number of which is specified by poolSize in the constructor. The user submits a Runnable task by calling the execute() method, and the internal implementation of the execute() method simply adds the task to the workQueue. A worker thread maintained within the thread pool consumes and executes tasks in the workQueue, with a while loop.
2.5 How do I Disable a Thread Pool?
2.5.1 shutDown
Shutdown calls advanceRunState(shutdown), while shutdownNow calls (STOP), meaning that the thread pool state is set differently after the call
2.5.2 shutDownNow
Shutdown calls to interrupt idle Workers, while shutdownNow calls to interrupt all Workers shutdownNow will take out all tasks in the task queue and return a task list. Shutdown returns nothing.
2.6 How to Properly Configure a thread Pool
To properly configure thread pools, you must first analyze task characteristics
- The nature of the task: CPU intensive task, IO intensive task, and hybrid task.
- Task priority: high, medium and low.
- Task execution time: long, medium and short.
- Task dependencies: Whether they depend on other system resources, such as database connections.
2.6.1 Determining the number of core threads
If a mixed task can be divided into one CPU intensive task and one IO intensive task (read/write database, file, network, etc.), as long as the execution time difference between the two tasks is not too big, the throughput of decomposed execution will be higher than that of serial execution. If the execution time of the two tasks is too different, there is no need to break them down. You can use runtime.getruntime ().availableProcessors() to get the number of cpus on the current device.
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// The number of core threads
private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1.4));
Copy the code
2.6.2 Determining the maximum number of threads in a thread pool
Since IO intensive task threads are not executing tasks all the time, configure as many threads as possible, such as 2*Ncpu.
private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
Copy the code
2.6.3 Creating a Thread Pool
private static ThreadPoolProxy mBackgroundPool = null;
private static final Object mBackgroundLock = new Object();
private static ThreadPoolProxy mDownloadPool = null;
private static final Object mDownloadLock = new Object();
private static Map<String, ThreadPoolProxy> mMap = new HashMap<>();
private static final Object mSingleLock = new Object();
Copy the code
2.6.3.0 Thread pool policy ThreadPoolProxy
public static class ThreadPoolProxy {
private ThreadPoolExecutor mPool;
private int mCorePoolSize;
private int mMaximumPoolSize;
private long mKeepAliveTime;
private boolean mIsPriority;
/ * * *@paramCorePoolSize Number of core threads *@paramMaximumPoolSize Maximum number of threads *@paramKeepAliveTime indicates the keepAliveTime of idle threads in seconds */
private ThreadPoolProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime, boolean isPriority) {
mCorePoolSize = corePoolSize;
mMaximumPoolSize = maximumPoolSize;
mKeepAliveTime = keepAliveTime;
mIsPriority = isPriority;
}
Copy the code
Execute the task, and when the thread pool is closed, a new thread pool is created
public synchronized void execute(Runnable run) {
if (run == null) {
return;
}
if (mPool == null || mPool.isShutdown()) {
//ThreadFactory creates a new ThreadFactory each time
if (mIsPriority) {// Use the priority queue
mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy());
} else {// Queue tasks
mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy());
}
}
mPool.execute(run);
}
Copy the code
Cancels a task in the thread pool that has not yet been executed
public synchronized void remove(Runnable run) {
if(mPool ! =null && (!mPool.isShutdown() || mPool.isTerminating())) {
mPool.getQueue().remove(run);
}
}
/** * Whether to include a task */
public synchronized boolean contains(Runnable run) {
if(mPool ! =null&& (! mPool.isShutdown() || mPool.isTerminating())) {return mPool.getQueue().contains(run);
} else {
return false; }}Copy the code
Closing the thread pool
/** * closes thread pool, **@paramIsNow if True immediately terminates the thread pool and attempts to interrupt the task in progress, emptying the task cache queue and returning the task that has not yet been executed. * if false, ensure that all added tasks will be completed before closing, do not accept tasks **/
public synchronized void shutdown(boolean isNow) {
if(mPool ! =null&& (! mPool.isShutdown() || mPool.isTerminating())) {if (isNow) {
mPool.shutdownNow();
} else {
mPool.shutdown();
}
}
}
}
}
Copy the code
2.6.3.1 CPU Intensive (Encryption, Hash)
- Gets the background thread pool, and the core thread stays alive.
Cpu-intensive tasks should be configured with the smallest possible threads, such as a thread pool of Ncpu+1 threads.
public static ThreadPoolProxy getBackgroundPool(a) {
synchronized (mBackgroundLock) {
if (mBackgroundPool == null) {
mBackgroundPool = new ThreadPoolProxy(corePoolSize, maximumPoolSize, 60L.false);
}
returnmBackgroundPool; }}Copy the code
2.6.3.2 Concurrent thread pool
Gets a thread pool for concurrent file downloads, changing the number of core threads and the maximum number of threads
public static ThreadPoolProxy getDownloadPool(a) {
synchronized (mDownloadLock) {
if (mDownloadPool == null) {
mDownloadPool = new ThreadPoolProxy(4.12.60L.true);
}
returnmDownloadPool; }}Copy the code
2.6.3.3 Single-thread pool
Get a single thread pool, and all tasks will be executed in the order they were added, eliminating synchronization overhead
public static ThreadPoolProxy getSinglePool(String name) {
synchronized (mSingleLock) {
ThreadPoolProxy singlePool = mMap.get(name);
if (singlePool == null) {
singlePool = new ThreadPoolProxy(0.1.60L.false);
mMap.put(name, singlePool);
}
returnsinglePool; }}Copy the code
Three. Simple version of the download implementation
3.0.1 Defining the Enumeration Status of the current download task DownloadStatus
The enumeration name | meaning |
---|---|
STATUS_NONE | The default state is no task |
STATUS_WAITING | Waiting for download |
STATUS_DOWNLOADING | Is downloading |
STATUS_PAUSED | To stop the downloading |
STATUS_DOWNLOADED | The download is complete |
STATUS_DOWNLOADED | The download is complete |
3.0.2 URL, breakpoint download switch, thread priority, connection and read timeout time dynamic configuration
public class DownloadConfig {
public String url;// Download address, unique identifier
public String targetPath;// Path to the downloaded file
public int intervalTime = 1000;// The default callback is once per second
public boolean isCallBackInUIThread = true;The default callback is in the main thread
public boolean isRange = true;// Whether to enable the breakpoint download function. It is enabled by default
public int timeOutMillisecond = 30 * 1000;// Connection and read timeout
public int priority;// Default thread priority is 0, normal task priority is 0, so library download priority is 8, other more customized
public DownloadConfig(String url, String targetPath, boolean isRange, boolean isCallBackInUIThread, int intervalTime, int timeOutMillisecond) {
this.url = url;
this.targetPath = targetPath;
this.isRange = isRange;
this.isCallBackInUIThread = isCallBackInUIThread;
this.intervalTime = intervalTime;
this.timeOutMillisecond = timeOutMillisecond;
}
Copy the code
3.0.3 Interface for Customizing Download Tasks BaseDownloadTask
The method name | role | type |
---|---|---|
add() | Add tasks | BaseDownloadTask |
setListener(DownloadListener listener) | Set up to monitor | BaseDownloadTask |
start() | Task start | void |
pause() | Suspended task | void |
cancel() | Cancel tasks that have not yet started | void |
getUrl() | Gets the URL to download the file | String |
getErrorCode() | Download error code | int |
getTempFile() | Download temporary files | int |
getTargetPath() | Path to the downloaded target file | String |
getDownloadedSize() | Size that has been downloaded | long |
getTotalSize() | Total file download size | long |
toString() | Return information | String |
3.0.4 Interface for downloading files
public interface IDownloader{
void downloadFile(DownloadTask task);
}
Copy the code
3.0.5 configure the DownloadListener, providing listening success, failure, pause, download progress callback DownloadListener
The method name | role | Parameter is introduced |
---|---|---|
onSuccess(BaseDownloadTask task) | Download Successful Listening | Single-task download callback |
onError(BaseDownloadTask task, Throwable e) | Download error monitor | Throwable exception occurred during single-task download callback. Procedure |
onPaused(BaseDownloadTask task, long soFarBytes, long totalBytes) | Pause error listening | The single download task is suspended. The file size that has been transferred is reported. The local file size and the total file transfer size are reported |
onProgress(long localBytes, long soFarBytes, long totalSize) | Suspend progress monitoring | The file size that has been transferred is reported. The local file size and the total file transfer size are reported |
3.0.6 Encapsulating a single download task
Comparable sets priorities for our tasks. BaseDownloadTask is to do simple tasks to start, end, pause, cancel the download, etc
3.0.6.1 DownloadTask member variable
The DownloadTask member variable has several concerns:
The field name | role | type |
---|---|---|
tempSuffex | Temporary file name extension | String |
url | Download address, unique identifier | String |
targetPath | Path to the downloaded file | String |
tempFilePath | Download cache path | String |
fileName | The file name | String |
totalSize | The total size of the file transfer | long |
soFarBytes | Size downloaded by the current thread | long |
downloadedSize | Size that has been downloaded locally | long |
status | Download status | DownloadStatus |
errorCode | Failed to download the error code | int |
isChunk | Section, | boolean |
priority | The default thread priority is 0. Normal task 0, so library download priority is higher 8, other more customized | int |
timeOutMillisecond | timeout | int |
isRange | Whether to enable the breakpoint download function. The function is enabled by default | boolean |
isCallBackInUIThread | Whether to call back in the main thread | boolean |
intervalTime | The default callback is once per second | int |
3.0.6.2 DownloadTask constructor
The DownloadTask constructor has two methods, one is to cram all external custom parameters into DownloadConfig, and the other is to upload the parameters directly to DownloadTask
public DownloadTask(DownloadConfig config) {
this(config.url, config.targetPath, config.isRange, config.isCallBackInUIThread, config.intervalTime, config.timeOutMillisecond, config.priority);
}
public DownloadTask(String url, String targetPath, boolean isRange, boolean isCallBackInUIThread, int intervalTime, int timeOutMillisecond, int priority) {
this.url = url;
this.targetPath = targetPath;
this.isRange = isRange;
this.isCallBackInUIThread = isCallBackInUIThread;
this.intervalTime = intervalTime;
this.timeOutMillisecond = timeOutMillisecond;
if (this.intervalTime <= 0) {
this.intervalTime = 1000;
}
if (this.timeOutMillisecond <= 0) {
this.timeOutMillisecond = 30 * 1000;
}
this.status = DownloadStatus.STATUS_NONE;
this.tempFilePath = this.targetPath + tempSuffex;
this.priority = priority;
}
Copy the code
3.0.6.3 DownloadTask Adding a Task Add ()
When adding a task, we set the download task state to wait, and then initialize the file path and the corresponding file name in this method
@Override
public DownloadTask add(a) {
status = DownloadStatus.STATUS_WAITING;
final File file = new File(targetPath);
fileName = file.getName();
return this;
}
Copy the code
3.0.6.4 DownloadTask Run () => Task Start Start ()
If the download cache path is valid, the currently downloaded local size is returned to the business layer
@Override
public long getDownloadedSize(a) {
if(! TextUtils.isEmpty(tempFilePath)) { File file =new File(tempFilePath);
downloadedSize = file.exists() ? file.length() : 0;
} else {
downloadedSize = 0;
}
return downloadedSize;
}
Copy the code
3.0.6.5 Enabling Range breakpoint Download
If breakpoint download is enabled and has already been downloaded, set Range. Otherwise, breakpoint download is disabled
public boolean isRangeRequest(a) {
return isRange && getDownloadedSize() > 0;
}
Copy the code
3.0.6.6 compareTo Compares task priorities
@Override
public int compareTo(DownloadTask o) {// Task execution priority is sorted
if (this.getPriority() < o.priority) {
return 1;
}
if (this.getPriority() > o.priority) {
return -1;
}
return 0;
}
Copy the code
3.0.6.7 DownloadTask run() => start DownloadTask start()
At the start of the task, reset the task state, and then call FileDownloader in the run() method to perform the task download
@Override
public void start(a) {
// State reset
this.status = DownloadStatus.STATUS_NONE;
// Start downloading
if (downloader == null) {
downloader = new FileDownloader();
}
downloader.downloadFile(this);
}
Copy the code
3.0.6.8 DownloadTask suspending a DownloadTask pause()
In the pause download task method, set the download task to the pause state, and then use the download listener to pass the local query size and total file size back to the business layer
@Override
public void pause(a) {
status = DownloadStatus.STATUS_PAUSED;
if(listener ! =null) {
listener.onPaused(this, soFarBytes, totalSize); }}Copy the code
3.0.6.9 DownloadTask cancels the DownloadTask pause()
Canceling the download task is very simple, set the download task to cancel, and then delete all copies of the file
@Override
public void cancel(a) {
status = DownloadStatus.STATUS_NONE;
// Delete temporary files
FileUtils.deleteFile(tempFilePath);
}
Copy the code
3.0.7.0 DownloadTask Set DownloadListener setListener()
@Override
public DownloadTask setListener(DownloadListener listener) {
this.listener = listener;
return this;
}
Copy the code
There are some common methods are not a paste code, detailed API documentation is as follows:
The method name | role | type |
---|---|---|
toString | – | – |
getDownloadedSize | Gets the file size that has been downloaded | long |
getTotalSize | Gets the total file size | long |
getTempFile | Obtaining a copy of a file | File |
getErrorCode | Get error code | int |
getUrl | Get the download link | String |
getPriority | Gets the thread priority | int |
FileDownloader supports breakpoint download
3.0.7.1 FileDownloader Processes single-task file downloads downloadFile
Downloadfiles as a whole look relatively simple, the main thread Looper is initialized here, we default to not go download success callback here, then how does downloadByHttpURLConn work
@Override
public void downloadFile(DownloadTask task) {
if (task == null) return;
if (handler == null) {
handler = new Handler(Looper.getMainLooper());
}
hasCallbackSuccess = false;
downloadByHttpURLConn(task);
}
Copy the code
3.0.7.2 downloadByHttpURLConn
DownloadByHttpURLConn is downloaded using the system API and can be downloaded at a breakpoint if the server supports 206 segmented requests
In this scenario, we set the download status to STATUS_DOWNLOADING and create HttpURLConnection to establish an Http connection. By default, we use GET request for downloading and the connection is keep-alive Long connection state, no caching, configurable breakpoint download entry
task.status = DownloadTask.DownloadStatus.STATUS_DOWNLOADING;
HttpURLConnection conn = null;
InputStream inputStream = null;
try {
URL url = new URL(task.url);
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(task.timeOutMillisecond);
conn.setReadTimeout(task.timeOutMillisecond);
conn.setRequestProperty("Connection"."Keep-Alive");
conn.setRequestProperty("Cache-Control"."no-cache");
conn.setRequestProperty("pragma"."no-cache");
conn.setRequestProperty("Accept"."* / *");
// Whether to enable range requests with code=416, indicating that the range of requests is incorrect
String rangStr = "";
if (task.isRangeRequest()) {//isRangeRequest() gets the latest local cache file downloadedSize
rangStr = "bytes=" + task.downloadedSize + "-";
conn.setRequestProperty("Range", rangStr);//+ task.totalSize
//Logger.d(TAG, "isRangeRequest" + task.downloadedSize);
}
Copy the code
- Gets the status code returned by the server
// Get the status code returned by the server
int code = conn.getResponseCode();
if (200 == code || 206 == code) { //200 Succeeded in requesting all server resources. //206 Succeeded in requesting some server resources
final String length = conn.getHeaderField("Content-Length");
if(! TextUtils.isEmpty(length)) {/ / total length
task.totalSize = Long.parseLong(length);
}
if (task.totalSize == 0) {
final String transferEncoding = conn.getHeaderField("Transfer-Encoding"); task.isChunk = (transferEncoding ! =null && transferEncoding.equals("chunked"));
task.totalSize = TOTAL_VALUE_IN_CHUNKED_RESOURCE;
}
Map<String, List<String>> map = conn.getHeaderFields();
//Logger.d(TAG, "code=" + code + ",length=" + length);
inputStream = conn.getInputStream();
startWriteFile(task, code, inputStream, rangStr, map);
} else {/ / error
task.errorCode = code;
if (416 == code) {// Request segment range error, local temporary file size is already problematic
// Delete temporary files
FileUtils.deleteFile(task.tempFilePath);
}
handleError(task, new Throwable("net request error code=" + code + "|" + rangStr + "|url:" + task.url + "|tempFile:" + task.targetPath));
}
Copy the code
- 200 The request for all server resources is returned successfully
- 206 The request for some server resources returned successfully
- Content-length If the total file size is 0, we cancel the current file download task
- Abnormal code
- 416
- Request segment range error, local temporary file size is already problematic, we need to delete temporary file
- Do not forcibly delete the local cache when an error occurs
- 416
private void handleError(final DownloadTask downloadTask, final Throwable e) {
if (downloadTask == null) return;
downloadTask.status = DownloadTask.DownloadStatus.STATUS_ERROR;
DownloadManager.init().removeTask(downloadTask.url);
if (downloadTask.isCallBackInUIThread) {
handler.post(() -> {
Logger.d(TAG, "download error " + e + ",isCallBackInUIThread=" + downloadTask.isCallBackInUIThread);
if(downloadTask.listener ! =null) { downloadTask.listener.onError(downloadTask, e); }}); }else {
if(downloadTask.listener ! =null) { downloadTask.listener.onError(downloadTask, e); }}}Copy the code
Of course we should not forget to close the stream and cancel the HTTP connection at best
finally {
if(conn ! =null) conn.disconnect();
if(inputStream ! =null) {
try {
inputStream.close();
} catch(IOException e) { e.printStackTrace(); }}}Copy the code
3.0.7.3 startWriteFile Writing a File
The downloadByHttpURLConn success return code has a method for writing files to startWriteFile. If the server supports 206 segmented requests, write files using RandomAccessFile
The logic of writing a file is relatively simple, first delete the target file, and then build a copy file
- If it’s 200 you don’t have to do breakpoint continuation
if (200 == code) {// Non-range requests
tempFile.delete();
downloadTask.downloadedSize = 0;
fileOutputStream = new FileOutputStream(tempFile);
}
Copy the code
- If it is 206, resumable at breakpoint
(206 == code) {// Resumable
currentSize = tempFile.length();
downloadTask.downloadedSize = currentSize;
raf = new RandomAccessFile(tempPath, "rw");
raf.seek(currentSize);
Logger.d(TAG, "206 range downloadedSize=" + currentSize + ",totalSize=" + downloadTask.totalSize);
}
Copy the code
Then perform the file read and write work, there are three considerations in the middle
while((len = inputStream.read(buffer)) ! = -1 && DownloadTask.DownloadStatus.STATUS_DOWNLOADING == downloadTask.status) {
if (200 == code) {
fileOutputStream.write(buffer, 0, len);
fileOutputStream.flush();
} else if (206 == code) {
raf.write(buffer, 0, len); }}Copy the code
- The callback is performed every intervalTime millisecond and must be performed after the download is completed
soFarBytes += len;
downloadTask.downloadedSize = currentSize + soFarBytes;
downloadTask.soFarBytes = soFarBytes;
if(System.currentTimeMillis() - start >= downloadTask.intervalTime || soFarBytes == downloadTask.totalSize) { notifyProgress(downloadTask, soFarBytes); start = System.currentTimeMillis(); }}Copy the code
- If the local file size is the same as that on the server, the callback is successful
if (downloadTask.soFarBytes == downloadTask.totalSize) {
handleSuccess(downloadTask);
}
Copy the code
- Error writing file, stop downloading, callback error. For block transfers, totalSize is not the actual totalSize
if (downloadTask.totalSize > 0&& downloadTask.soFarBytes > downloadTask.totalSize && ! downloadTask.isChunk) { String errorMsg = ERROR_DOWNLOAD_MORE_SIZE +" localSize=" + downloadTask.downloadedSize +
",soFar=" + downloadTask.soFarBytes + ",total=" + downloadTask.totalSize;
Logger.d(TAG, errorMsg);
downloadTask.cancel();
handleError(downloadTask, new RuntimeException(errorMsg));
}
Copy the code
- If STATUS_DOWNLOADING file in the downloading state fails to perform the rollback, the downloading fails
if (DownloadTask.DownloadStatus.STATUS_DOWNLOADING == downloadTask.status) {
if(! hasCallbackSuccess && (! downloadTask.isChunk && downloadTask.soFarBytes == downloadTask.totalSize)) {// Non-chunk download, and the size and total size of the download are consistent with the callback success
handleSuccess(downloadTask);
} else if(! hasCallbackSuccess && downloadTask.isChunk) {//Chunk download, temporarily considered successful
handleSuccess(downloadTask);
} else {
/ / fail
StringBuilder headStr = new StringBuilder();
if (null! = map) {for (String key : map.keySet()) {
headStr.append("|").append(key).append("=").append(map.get(key).toString());
}
}
handleError(downloadTask, new Throwable("Error WriteFile,hasCallSuc:" + hasCallbackSuccess + "|headerStr:" + headStr + "|bytes:" + downloadTask.soFarBytes + "-" + downloadTask.totalSize + "-" + downloadTask.downloadedSize +
"|" + rangStr + "|url:" + downloadTask.url + "|tempFile:" + tempFile.getPath() + "-"+ tempFile.length())); }}Copy the code
3.0.7.4 Progress notification callback notifyProgress
If the current task is running in the main thread, we switch the download progress update operation to the UI thread
private void notifyProgress(final DownloadTask downloadTask, final long soFarBytes) {
if (DownloadTask.DownloadStatus.STATUS_DOWNLOADING == downloadTask.status) {
if (downloadTask.isCallBackInUIThread) {
handler.post(() -> {
if(downloadTask.listener ! =null) { downloadTask.listener.onProgress(downloadTask.downloadedSize, soFarBytes, downloadTask.totalSize); }}); }else {
if(downloadTask.listener ! =null) { downloadTask.listener.onProgress(downloadTask.downloadedSize, soFarBytes, downloadTask.totalSize); }}}}Copy the code
3.0.7.5 Successfully downloading handleSuccess Callback
If the transfer is segmtioned, we need to ensure that the downloaded file size is equal to the total file size. After the download is successful, rename and remove the. Temp temporary file, and remove the linked download task from the queue, and then call back the successful download to the service layer
private void handleSuccess(final DownloadTask downloadTask) {
if (downloadTask == null) return;
if (hasCallbackSuccess) return;
downloadTask.status = DownloadTask.DownloadStatus.STATUS_DOWNLOADED;
if (downloadTask.isChunk) {
downloadTask.totalSize = downloadTask.downloadedSize;// Ensure a file size callback when the download is finally completed
}
// After complete download, rename and remove. Temp temporary files
final File targetFile = new File(downloadTask.targetPath);
if (targetFile.exists()) {
targetFile.delete();
}
final File tempDownloadedFile = new File(downloadTask.tempFilePath);
tempDownloadedFile.renameTo(targetFile);
//Log.d("dq-handleSuccess", targetFile.getName() + ",url=" + downloadTask.url);
DownloadManager.init().removeTask(downloadTask.url);
if (downloadTask.isCallBackInUIThread) {
handler.post(() -> {
if(downloadTask.listener ! =null) { downloadTask.listener.onSuccess(downloadTask); }}); }else {
if(downloadTask.listener ! =null) {
downloadTask.listener.onSuccess(downloadTask);
}
}
hasCallbackSuccess = true;
}
Copy the code
3.0.7.6 Download failure Callback handleSuccess
Do not violently delete the local cache when an error occurs, as other callbacks may be affected
private void handleError(final DownloadTask downloadTask, final Throwable e) {
if (downloadTask == null) return;
downloadTask.status = DownloadTask.DownloadStatus.STATUS_ERROR;
DownloadManager.init().removeTask(downloadTask.url);
if (downloadTask.isCallBackInUIThread) {
handler.post(() -> {
Logger.d(TAG, "download error " + e + ",isCallBackInUIThread=" + downloadTask.isCallBackInUIThread);
if(downloadTask.listener ! =null) { downloadTask.listener.onError(downloadTask, e); }}); }else {
if(downloadTask.listener ! =null) { downloadTask.listener.onError(downloadTask, e); }}}Copy the code
3.0.8 DownloadManager DownloadManager package
3.0.8.1 DownloadManager ()
The DownloadManager constructor takes the main thread Looper and initializes the normal download thread pool
private DownloadManager(a) {
mTaskMap = new ConcurrentHashMap<>();
mHandler = new Handler(Looper.getMainLooper());
downloadPool = ThreadManager.getDownloadPool();// Normal download thread pool;
}
Copy the code
3.0.8.2 init ()
DownloadManager is one of the simplest singleton implementations where we get a DownloadManager reference and call init directly
public static DownloadManager init(a) {
return SingletonHolder.INSTANCE;
}
Copy the code
3.0.8.3 Download () Download files
Initialize file download task, add to download thread pool, default in child thread callback download task
/** * easy to customize all parameters **@paramConfig Download parameters, extensible *@paramListener Download listener, callback *@returnReturns the current download task */
public synchronized DownloadTask download(DownloadConfig config, DownloadListener listener) {
if (config == null || TextUtils.isEmpty(config.url) || TextUtils.isEmpty(config.targetPath)) {
throw new NullPointerException();
}
final DownloadTask downloadTask = new DownloadTask(config)
.setListener(listener)
.add();
addToDownloader(downloadTask);
return downloadTask;
}
public synchronized DownloadTask download(String url, String targetPath, boolean isRange, boolean isCallBackInUIThread, int intervalTime, int timeOutMillisecond, DownloadListener listener) {
if (TextUtils.isEmpty(url) || TextUtils.isEmpty(targetPath)) {
throw new NullPointerException();
}
final DownloadTask downloadTask = new DownloadTask(url, targetPath, isRange, isCallBackInUIThread, intervalTime, timeOutMillisecond, 0)
.setListener(listener)
.add();
addToDownloader(downloadTask);
return downloadTask;
}
Copy the code
3.0.8.4 Adding a Download Task
If the version of the downloaded file is different, clear the cache file before downloading to prevent errors caused by file writing in different periods of time
public synchronized void addToDownloader(DownloadTask task) {
if (task == null || TextUtils.isEmpty(task.url)) {
throw new NullPointerException();
}
DownloadTask downloadTask = mTaskMap.get(task.url);
if (downloadTask == null) { // Create a new download task and add it to the thread pool
mTaskMap.put(task.url, task);
downloadPool.execute(task);
//Logger.d(TAG, "dq-start download:" + task.getUrl());
} else {
downloadTask.pause();
downloadTask.setListener(task.listener);// Reset the listener
downloadPool.remove(downloadTask);
downloadPool.execute(downloadTask);// Execute directly}}Copy the code
3.0.8.5 Suspending the Download Pause
public synchronized boolean pause(String url) {
if(! TextUtils.isEmpty(url)) { DownloadTask downloadTask = mTaskMap.get(url);if(downloadTask ! =null) {
downloadTask.setListener(null);
downloadTask.pause();
removeTask(url);
//Logger.d(TAG, "pause download:" + downloadTask);
return true; }}return false;
}
Copy the code
3.0.8.6 Canceling the Download Cancel
Cancel the download, set the listener to NULL, clear the downloaded file, and remove task from the thread pool
public synchronized void cancel(String url) {
if (TextUtils.isEmpty(url)) {
throw new NullPointerException();
}
DownloadTask downloadTask = mTaskMap.get(url);
if(downloadTask ! =null) {
downloadTask.setListener(null);
downloadTask.cancel();// The downloaded files will be cleared
removeTask(url);
//Logger.d(TAG, "cancel download:" + downloadTask);}}Copy the code
3.0.8.7 Clearing the Task Cache removeTask
Remove tasks from the thread pool
public synchronized DownloadTask removeTask(String url) {
final DownloadTask downloadTask = mTaskMap.remove(url);
if(downloadTask ! =null) {
downloadPool.remove(downloadTask);
}
return downloadTask;
}
Copy the code
3.0.8.8 Stop downloading task onAppExit when the program exits
Be sure to clear all tasks before exiting the application and then suspend the queue and clear all tasks in it
public synchronized void onAppExit(a) {
for (DownloadTask downloadTask : mTaskMap.values()) {
downloadTask.setListener(null);
downloadTask.pause();
downloadPool.remove(downloadTask);
}
mTaskMap.clear();
downloadPool.shutdown(true);
}
Copy the code