Source code analysis, if need to be reproduced, please specify the author: Yuloran (t.cn/EGU6c76)
preface
Wheel Maker: Season_zlc
This article focuses on thread scheduling for RxDownload2
Download the task dispatch thread
As the name suggests, it is a thread that distributes download tasks. The thread runs in a DownloadService, which from a business perspective should only be started () & bind() once. Task dispatch thread, created on onBind() :
- start & bind service [-> RxDownload.java]
/**
* start and bind service.
*
* @param callback Called when service connected.
*/
private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
Intent intent = new Intent(context, DownloadService.class);
intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
context.startService(intent);
context.bindService(intent, new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder binder) {
DownloadService.DownloadBinder downloadBinder
= (DownloadService.DownloadBinder) binder;
downloadService = downloadBinder.getService();
context.unbindService(this);
bound = true;
callback.call();
}
@Override
public void onServiceDisconnected(ComponentName name) {
/ / note!!!!! This method is only called when the system kills a Service!
bound = false;
}
}, Context.BIND_AUTO_CREATE);
}
Copy the code
One detail of this code is that onServiceConnected() immediately calls unbindService().
- onBind [-> DownloadService.java]
@Nullable
@Override
public IBinder onBind(Intent intent) {
log("bind Download Service");
startDispatch();
return mBinder;
}
Copy the code
- startDispatch() [-> DownloadService.java]
/** * start dispatch download queue. */
private void startDispatch(a) {
disposable = Observable
.create(new ObservableOnSubscribe<DownloadMission>() {
@Override
public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
DownloadMission mission;
while(! emitter.isDisposed()) {try {
log(WAITING_FOR_MISSION_COME);
mission = downloadQueue.take();
log(Constant.MISSION_COMING);
} catch (InterruptedException e) {
log("Interrupt blocking queue.");
continue;
}
emitter.onNext(mission);
}
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<DownloadMission>() {
@Override
public void accept(DownloadMission mission) throws Exception { mission.start(semaphore); }}); }Copy the code
- The above code is an implementation of downloading the task distribution thread. Among them
.subscribeOn(Schedulers.newThread())
Indicates that the Thread was created as a new Thread() - Download tasks are maintained using a blocking queue, which is already synchronized internally, so there is no need to worry about concurrency
- As long as the subscription is not unsubscribed, the thread keeps trying to get download tasks from the blocking queue and launch them. If the queue is empty, it blocks until a new task is queued
- The above code is not rigorous, right
disposable
No attempt is made to unsubscribe before reassigning. If you call it multiple timesbindService()
, thread leaks will occur
Download the task execution thread
As the name implies, is the execution thread of the download task. This thread runs on the schedulers.io () thread pool. The input parameter semaphore is used to limit the maximum number of tasks that can be downloaded simultaneously.
- start(final Semaphore semaphore) [-> SingleMission.java]
@Override
public void start(final Semaphore semaphore) {
disposable = start(bean, semaphore, new MissionCallback() {
@Override
public void start(a) {
// The callback starts downloading
if(callback ! =null) callback.start();
}
@Override
public void next(DownloadStatus value) {
// The callback is downloading
status = value;
processor.onNext(started(value));
if(callback ! =null) callback.next(value);
}
@Override
public void error(Throwable throwable) {
// The callback download failed
processor.onNext(failed(status, throwable));
if(callback ! =null) callback.error(throwable);
}
@Override
public void complete(a) {
// The callback download is complete
processor.onNext(completed(status));
if(callback ! =null) callback.complete(); }}); }Copy the code
- start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) [-> DownloadMission.java]
protected Disposable start(DownloadBean bean, final Semaphore semaphore,
final MissionCallback callback) {
return rxdownload.download(bean)
.subscribeOn(Schedulers.io()) // Specify the download task execution thread
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
if (canceled.get()) {
dispose(disposable);
}
log(TRY_TO_ACQUIRE_SEMAPHORE);
// Apply semaphore
semaphore.acquire();
log(ACQUIRE_SUCCESS);
// After obtaining the semaphore, you need to check again to see if the download has been suspended
if (canceled.get()) {
// If the subscription has been suspended, the semaphore is released
dispose(disposable);
} else{ callback.start(); }}},new Action() {
@Override
public void run(a) throws Exception {
// When unsubscribes, semaphores need to be released
semaphore.release();
}
})
.subscribe(new Consumer<DownloadStatus>() {
@Override
public void accept(DownloadStatus value) throws Exception {
// Callback the download progresscallback.next(value); }},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// The callback download failedcallback.error(throwable); }},new Action() {
@Override
public void run(a) throws Exception {
// The callback download is completecallback.complete(); }}); }Copy the code
Download task interrupt thread
As the name implies, to interrupt the download task thread. It includes four operations: pause, delete, pause all, and cancel all. These operations also run on the schedulers.io () thread pool.
- pauseServiceDownload(final String missionId) [-> RxDownlaod.java]
/**
* Pause download.
* <p>
* Pause a url or all tasks belonging to missionId.
*
* @param missionId url or missionId
*/
publicObservable<? > pauseServiceDownload(final String missionId) {
CreateGeneralObservable is an asynchronous download Observable that forces synchronization with a semaphore of 1 resource
return createGeneralObservable(new GeneralObservableCallback() {
@Override
public void call(a) {
// After the service is bound, the suspended download of the service is invoked
downloadService.pauseDownload(missionId);
}
}).observeOn(AndroidSchedulers.mainThread());
}
Copy the code
- createGeneralObservable(final GeneralObservableCallback callback) [-> RxDownload.java]
/**
* return general observable
*
* @param callback Called when observable created.
* @return Observable
*/
privateObservable<? > createGeneralObservable(final GeneralObservableCallback callback) {
// The method should be called bindService
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
if(! bound) {// Because onServiceConnected ected nodes are asynchronous callbacks, there is a forced synchronization with a 1 resource semapor (CountDownLatch is also available).
semaphore.acquire();
if(! bound) { startBindServiceAndDo(new ServiceConnectedCallback() {
@Override
public void call(a) {
// After service binding, callback
doCall(callback, emitter);
// Release semaphoresemaphore.release(); }}); }else{ doCall(callback, emitter); semaphore.release(); }}else {
doCall(callback, emitter);
}
}
}).subscribeOn(Schedulers.io()); // Specifies that the IO thread is executed, so suspending the download is also executed in that thread
}
Copy the code
Likewise, delete downloads call createGeneralObservable() first, so delete operations are performed on schedulers.io ().
conclusion
- A separate thread is used to distribute download tasks:
Schedulers.newThread()
- Download one thread at a time from the thread pool:
Schedulers.io()
- Suspend, delete download one thread at a time from the thread pool:
Schedulers.io()
The attached
RxDownload2 series:
- RxDownload2 source code analysis (a)
- RxDownload2 source code analysis (2)
- RxDownload2 source code analysis (3)
- RxDownload2 File Download Slow Analysis