Source code analysis, if need to be reproduced, please specify the author: Yuloran (t.cn/EGU6c76)

preface

Wheel Maker: Season_zlc

This article focuses on thread scheduling for RxDownload2

Download the task dispatch thread

As the name suggests, it is a thread that distributes download tasks. The thread runs in a DownloadService, which from a business perspective should only be started () & bind() once. Task dispatch thread, created on onBind() :

  1. start & bind service [-> RxDownload.java]
    /**
     * start and bind service.
     *
     * @param callback Called when service connected.
     */
    private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
        Intent intent = new Intent(context, DownloadService.class);
        intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
        context.startService(intent);
        context.bindService(intent, new ServiceConnection() {
            @Override
            public void onServiceConnected(ComponentName name, IBinder binder) {
                DownloadService.DownloadBinder downloadBinder
                        = (DownloadService.DownloadBinder) binder;
                downloadService = downloadBinder.getService();
                context.unbindService(this);
                bound = true;
                callback.call();
            }

            @Override
            public void onServiceDisconnected(ComponentName name) {
                / / note!!!!! This method is only called when the system kills a Service!
                bound = false;
            }
        }, Context.BIND_AUTO_CREATE);
    }
Copy the code

One detail of this code is that onServiceConnected() immediately calls unbindService().

  1. onBind [-> DownloadService.java]
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        log("bind Download Service");
        startDispatch();
        return mBinder;
    }
Copy the code
  1. startDispatch() [-> DownloadService.java]
    /** * start dispatch download queue. */
    private void startDispatch(a) {
        disposable = Observable
                .create(new ObservableOnSubscribe<DownloadMission>() {
                    @Override
                    public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
                        DownloadMission mission;
                        while(! emitter.isDisposed()) {try {
                                log(WAITING_FOR_MISSION_COME);
                                mission = downloadQueue.take();
                                log(Constant.MISSION_COMING);
                            } catch (InterruptedException e) {
                                log("Interrupt blocking queue.");
                                continue;
                            }
                            emitter.onNext(mission);
                        }
                        emitter.onComplete();
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Consumer<DownloadMission>() {
                    @Override
                    public void accept(DownloadMission mission) throws Exception { mission.start(semaphore); }}); }Copy the code
  • The above code is an implementation of downloading the task distribution thread. Among them.subscribeOn(Schedulers.newThread())Indicates that the Thread was created as a new Thread()
  • Download tasks are maintained using a blocking queue, which is already synchronized internally, so there is no need to worry about concurrency
  • As long as the subscription is not unsubscribed, the thread keeps trying to get download tasks from the blocking queue and launch them. If the queue is empty, it blocks until a new task is queued
  • The above code is not rigorous, rightdisposableNo attempt is made to unsubscribe before reassigning. If you call it multiple timesbindService(), thread leaks will occur

Download the task execution thread

As the name implies, is the execution thread of the download task. This thread runs on the schedulers.io () thread pool. The input parameter semaphore is used to limit the maximum number of tasks that can be downloaded simultaneously.

  1. start(final Semaphore semaphore) [-> SingleMission.java]
    @Override
    public void start(final Semaphore semaphore) {
        disposable = start(bean, semaphore, new MissionCallback() {
            @Override
            public void start(a) {
                // The callback starts downloading
                if(callback ! =null) callback.start();
            }

            @Override
            public void next(DownloadStatus value) {
                // The callback is downloading
                status = value;
                processor.onNext(started(value));
                if(callback ! =null) callback.next(value);
            }

            @Override
            public void error(Throwable throwable) {
                // The callback download failed
                processor.onNext(failed(status, throwable));
                if(callback ! =null) callback.error(throwable);
            }

            @Override
            public void complete(a) {
                // The callback download is complete
                processor.onNext(completed(status));
                if(callback ! =null) callback.complete(); }}); }Copy the code
  1. start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) [-> DownloadMission.java]
    protected Disposable start(DownloadBean bean, final Semaphore semaphore,
                               final MissionCallback callback) {
        return rxdownload.download(bean)
                .subscribeOn(Schedulers.io()) // Specify the download task execution thread
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        if (canceled.get()) {
                            dispose(disposable);
                        }

                        log(TRY_TO_ACQUIRE_SEMAPHORE);
                        // Apply semaphore
                        semaphore.acquire();
                        log(ACQUIRE_SUCCESS);
                        
                        // After obtaining the semaphore, you need to check again to see if the download has been suspended
                        if (canceled.get()) {
                            // If the subscription has been suspended, the semaphore is released
                            dispose(disposable);
                        } else{ callback.start(); }}},new Action() {
                    @Override
                    public void run(a) throws Exception {
                        // When unsubscribes, semaphores need to be released
                        semaphore.release();
                    }
                })
                .subscribe(new Consumer<DownloadStatus>() {
                    @Override
                    public void accept(DownloadStatus value) throws Exception {
                         // Callback the download progresscallback.next(value); }},new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        // The callback download failedcallback.error(throwable); }},new Action() {
                    @Override
                    public void run(a) throws Exception {
                        // The callback download is completecallback.complete(); }}); }Copy the code

Download task interrupt thread

As the name implies, to interrupt the download task thread. It includes four operations: pause, delete, pause all, and cancel all. These operations also run on the schedulers.io () thread pool.

  1. pauseServiceDownload(final String missionId) [-> RxDownlaod.java]
    /**
     * Pause download.
     * <p>
     * Pause a url or all tasks belonging to missionId.
     *
     * @param missionId url or missionId
     */
    publicObservable<? > pauseServiceDownload(final String missionId) {
        CreateGeneralObservable is an asynchronous download Observable that forces synchronization with a semaphore of 1 resource
        return createGeneralObservable(new GeneralObservableCallback() {
            @Override
            public void call(a) {
                // After the service is bound, the suspended download of the service is invoked
                downloadService.pauseDownload(missionId);
            }
        }).observeOn(AndroidSchedulers.mainThread());
    }
Copy the code
  1. createGeneralObservable(final GeneralObservableCallback callback) [-> RxDownload.java]
    /**
     * return general observable
     *
     * @param callback Called when observable created.
     * @return Observable
     */
    privateObservable<? > createGeneralObservable(final GeneralObservableCallback callback) {
        // The method should be called bindService
        return Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                if(! bound) {// Because onServiceConnected ected nodes are asynchronous callbacks, there is a forced synchronization with a 1 resource semapor (CountDownLatch is also available).
                    semaphore.acquire();
                    if(! bound) { startBindServiceAndDo(new ServiceConnectedCallback() {
                            @Override
                            public void call(a) {
                                // After service binding, callback
                                doCall(callback, emitter);
                                // Release semaphoresemaphore.release(); }}); }else{ doCall(callback, emitter); semaphore.release(); }}else {
                    doCall(callback, emitter);
                }
            }
        }).subscribeOn(Schedulers.io()); // Specifies that the IO thread is executed, so suspending the download is also executed in that thread
    }
Copy the code

Likewise, delete downloads call createGeneralObservable() first, so delete operations are performed on schedulers.io ().

conclusion

  • A separate thread is used to distribute download tasks:Schedulers.newThread()
  • Download one thread at a time from the thread pool:Schedulers.io()
  • Suspend, delete download one thread at a time from the thread pool:Schedulers.io()

The attached

RxDownload2 series:

  • RxDownload2 source code analysis (a)
  • RxDownload2 source code analysis (2)
  • RxDownload2 source code analysis (3)
  • RxDownload2 File Download Slow Analysis