preface

Scheduler embodies the idea of responsive programming: changes are implemented through Scheduler and can be propagated downward. (Change propagation)

Thread transformation function module:
  1. Allow code to be executed on different threads
  2. SubscribeOn – The thread when subscribing
  3. ObserveOn – The thread that receives it
  4. Scheduler – Actually does thread transitions
1.RxJava1 thread transformation
  1. The Scheduler transfers
  2. Operator Operator interface
  3. The lift core operator
The instance

   Observable.
                        create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                if(! subscriber.isUnsubscribed()) { Log.d(TAG,"currentThread:" + Thread.currentThread());
                                    subscriber.onNext("test");
                                    subscriber.onCompleted();
                                }
                            }
                        }).
                        subscribeOn(Schedulers.newThread()).
                        observeOn(AndroidSchedulers.mainThread()).
                        subscribe(new Observer<String>() {
                            @Override
                            public void onCompleted(a) {}@Override
                            public void onError(Throwable e) {}@Override
                            public void onNext(String s) {
                                Log.d(TAG, "onNext:" + s + "currentThread:"+ Thread.currentThread()); }});Copy the code
run

6-12 17:00:13. 846, 6227-6495 / com. Haocai. Rxjavademo D/kpioneer: CurrentThread: Thread [RxNewThreadScheduler - 1, 5, the main] 6-12 17:00:13. 856, 6227-6227 / com. Haocai. Rxjavademo D/kpioneer: onNext:testcurrentThread:Thread[main,5,main]Copy the code
2.RxJava2 thread transformation
  1. The Scheduler transfers
  2. AbstractObservableWithUpStream abstract class

                   /*--------- no back pressure ---------*/
                Observable.
                        create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                if(! emitter.isDisposed()) { Log.d(TAG,"Observable currentThread:" + Thread.currentThread());
                                    emitter.onNext("test");
                                    emitter.onComplete();
                                }
                            }
                        }).
                        subscribeOn(Schedulers.newThread()).
                        observeOn(AndroidSchedulers.mainThread()).
                        subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {}@Override
                            public void onNext(String o) {
                                Log.d(TAG, "Observable onNext:" + o);
                                Log.d(TAG, "Observable currentThread:" + Thread.currentThread());

                            }

                            @Override
                            public void onError(Throwable e) {}@Override
                            public void onComplete(a) {}});/*--------- has back pressure ---------*/
                Flowable.create(new FlowableOnSubscribe<String>() {
                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                        if(! emitter.isCancelled()) { Log.d(TAG,"Flowable currentThread:" + Thread.currentThread());
                            emitter.onNext("test"); emitter.onComplete(); }}}, BackpressureStrategy.DROP). subscribeOn(Schedulers.newThread()). observeOn(AndroidSchedulers.mainThread()). subscribe(new Subscriber<String>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                s.request(Long.MAX_VALUE);
                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG, "Flowable onNext:" + s);
                                Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
                            }

                            @Override
                            public void onError(Throwable t) {}@Override
                            public void onComplete(a) {}});Copy the code

6-13 13:37:13. 009. 3063-3949 / com haocai. Rxjavademo D/kpioneer: Observables currentThread: Thread [RxNewThreadScheduler - 1, 5, the main] 6-13 13:37:13. 019, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: observables onNext: the test of 06-13 13:37:13. 019, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: Observables currentThread: Thread [5, the main, the main] 6-13 13:37:13. 019, 3063-3950 / com haocai. Rxjavademo D/kpioneer: Flowable currentThread: Thread [RxNewThreadScheduler - 2, 5, the main] 6-13 13:37:13. 029, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: Flowable onNext: the test of 06-13 13:37:13. 029, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: Flowable currentThread:Thread[main,5,main]Copy the code
3. Source code analysis of RxJava1 Scheduler
  1. Scheduler: Abstract class
  2. Worker: A class that actually does thread scheduling
  3. Action0: Action performed in the thread
  4. Schedule: Method of actually scheduling threads with the input parameter Action0

public abstract class Scheduler {

    public abstract Worker createWorker(a);

    /**
     * Sequential Scheduler for executing actions on a single thread or event loop.
     * <p>
     * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
     */
    public abstract static class Worker implements Subscription {

        /**
         * Schedules an Action for execution.
         *
         * @param action
         *            Action to schedule
         * @return a subscription to be able to prevent or cancel the execution of the action
         */
        public abstract Subscription schedule(Action0 action);


        public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);


        public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
            return SchedulePeriodicHelper.schedulePeriodically(this, action,
                    initialDelay, period, unit, null);
        }


        public long now(a) {
            returnSystem.currentTimeMillis(); }}public long now(a) {
        return System.currentTimeMillis();
    }


    @SuppressWarnings("unchecked")
    public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
        return (S) new SchedulerWhen(combine, this); }}Copy the code
Thread scheduling process:
  1. Different schedulers are passed to use different threads

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        returnsubscribeOn(scheduler, ! (this.onSubscribe instanceof OnSubscribeCreate));
    }
Copy the code

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }
Copy the code
  1. Scheduler creates workers to use a real thread pool

Create a thread pool in NewThreadWorker


public class NewThreadWorker extends Scheduler.Worker implements Subscription {
    private final ScheduledExecutorService executor;
    volatile boolean isUnsubscribed;
    /** The purge frequency in milliseconds. */
    private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
    /** Force the use of purge (true/false). */
    private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
    private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
    private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
    /** The purge frequency in milliseconds. */
    public static final int PURGE_FREQUENCY;
    private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
    private static final AtomicReference<ScheduledExecutorService> PURGE;
    /**
     * Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}.
     * Also, it works even for inheritance: {@link Method} of base class can be invoked on the instance of child class.
     */
    private static volatile Object cachedSetRemoveOnCancelPolicyMethod;

    /**
     * Possible value of {@link #cachedSetRemoveOnCancelPolicyMethod} which means that cancel policy is not supported.
     */
    private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();

    static {
        EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
        PURGE = new AtomicReference<ScheduledExecutorService>();
        PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);

        // Forces the use of purge even if setRemoveOnCancelPolicy is available
        final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);

        final int androidApiVersion = PlatformDependent.getAndroidApiVersion();

        // According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boo lean)
        // setRemoveOnCancelPolicy available since Android API 21SHOULD_TRY_ENABLE_CANCEL_POLICY = ! purgeForce && (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >=21);
    }
    /**
     * Registers the given executor service and starts the purge thread if not already started.
     * <p>{@code public} visibility reason: called from other package(s) within RxJava
     * @param service a scheduled thread pool executor instance
     */
    public static void registerExecutor(ScheduledThreadPoolExecutor service) {
        do {
            ScheduledExecutorService exec = PURGE.get();
            if(exec ! =null) {
                break;
            }
            exec = Executors.newScheduledThreadPool(1.new RxThreadFactory(PURGE_THREAD_PREFIX));
            if (PURGE.compareAndSet(null, exec)) {
                exec.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run(a) {
                        purgeExecutors();
                    }
                }, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);

                break;
            } else{ exec.shutdownNow(); }}while (true);

        EXECUTORS.putIfAbsent(service, service);
    }
    /**
     * Deregisters the executor service.
     * <p>{@code public} visibility reason: called from other package(s) within RxJava
     * @param service a scheduled thread pool executor instance
     */
    public static void deregisterExecutor(ScheduledExecutorService service) {
        EXECUTORS.remove(service);
    }

    /** Purges each registered executor and eagerly evicts shutdown executors. */
    static void purgeExecutors(a) {
        try {
            // This prevents map.keySet to compile to a Java 8+ KeySetView return type
            // and cause NoSuchMethodError on Java 6-7 runtimes.
            Map<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> map = EXECUTORS;
            Iterator<ScheduledThreadPoolExecutor> it = map.keySet().iterator();
            while (it.hasNext()) {
                ScheduledThreadPoolExecutor exec = it.next();
                if(! exec.isShutdown()) { exec.purge(); }else{ it.remove(); }}}catch(Throwable t) { Exceptions.throwIfFatal(t); RxJavaHooks.onError(t); }}/**
     * Tries to enable the Java 7+ setRemoveOnCancelPolicy.
     * <p>{@code public} visibility reason: called from other package(s) within RxJava.
     * If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
     * be called to enable the backup option of purging the executors.
     * @param executor the executor to call setRemoveOnCancelPolicy if available.
     * @return true if the policy was successfully enabled
     */
    public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
        if (SHOULD_TRY_ENABLE_CANCEL_POLICY) { // NOPMD
            final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;

            Method methodToCall;

            if (isInstanceOfScheduledThreadPoolExecutor) {
                final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;

                if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
                    return false;
                }

                if (localSetRemoveOnCancelPolicyMethod == null) { Method method = findSetRemoveOnCancelPolicyMethod(executor); cachedSetRemoveOnCancelPolicyMethod = method ! =null
                            ? method
                            : SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;

                    methodToCall = method;
                } else{ methodToCall = (Method) localSetRemoveOnCancelPolicyMethod; }}else {
                methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
            }

            if(methodToCall ! =null) {
                try {
                    methodToCall.invoke(executor, true);
                    return true;
                } catch (InvocationTargetException e) {
                    RxJavaHooks.onError(e);
                } catch (IllegalAccessException e) {
                    RxJavaHooks.onError(e);
                } catch(IllegalArgumentException e) { RxJavaHooks.onError(e); }}}return false;
    }

    /**
     * Tries to find {@code "setRemoveOnCancelPolicy(boolean)"} method in the class of passed executor.
     *
     * @param executor whose class will be used to search for required method.
     * @return {@code "setRemoveOnCancelPolicy(boolean)"} {@link Method}
     * or {@code null} if required {@link Method} was not found.
     */
    static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
        // The reason for the loop is to avoid NoSuchMethodException being thrown on JDK 6
        // which is more costly than looping through ~70 methods.
        for (final Method method : executor.getClass().getMethods()) {
            if (method.getName().equals("setRemoveOnCancelPolicy")) {
                finalClass<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
                    returnmethod; }}}return null;
    }

    /* package */
    public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
        boolean cancelSupported = tryEnableCancelPolicy(exec);
        if(! cancelSupported && execinstanceof ScheduledThreadPoolExecutor) {
            registerExecutor((ScheduledThreadPoolExecutor)exec);
        }
        executor = exec;
    }

    @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0.null);
    }

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        if (isUnsubscribed) {
            return Subscriptions.unsubscribed();
        }
        return scheduleActual(action, delayTime, unit);
    }

    /**
     * Schedules the given action by wrapping it into a ScheduledAction on the
     * underlying ExecutorService, returning the ScheduledAction.
     * @param action the action to wrap and schedule
     * @param delayTime the delay in execution
     * @param unit the time unit of the delay
     * @return the wrapper ScheduledAction
     */
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = newScheduledAction(decoratedAction); Future<? > f;if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = newScheduledAction(decoratedAction, parent); parent.add(run); Future<? > f;if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = newScheduledAction(decoratedAction, parent); parent.add(run); Future<? > f;if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

    @Override
    public void unsubscribe(a) {
        isUnsubscribed = true;
        executor.shutdownNow();
        deregisterExecutor(executor);
    }

    @Override
    public boolean isUnsubscribed(a) {
        returnisUnsubscribed; }}Copy the code
  1. Pass in the action Action0
  2. The scheduler method is used to implement scheduling

ScheduledAction in action. The call (); Perform specific operations


public final class ScheduledAction extends AtomicReference<Thread> implements Runnable.Subscription {
    / * * * /
    private static final long serialVersionUID = -3962399486978279857L;
    final SubscriptionList cancel;
    final Action0 action;

    public ScheduledAction(Action0 action) {
        this.action = action;
        this.cancel = new SubscriptionList();
    }
    public ScheduledAction(Action0 action, CompositeSubscription parent) {
        this.action = action;
        this.cancel = new SubscriptionList(new Remover(this, parent));
    }
    public ScheduledAction(Action0 action, SubscriptionList parent) {
        this.action = action;
        this.cancel = new SubscriptionList(new Remover2(this, parent));
    }

    @Override
    public void run(a) {
        try {
            lazySet(Thread.currentThread());
            action.call();
        } catch (OnErrorNotImplementedException e) {
            signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
        } catch (Throwable e) {
            signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
        } finally{ unsubscribe(); }}void signalError(Throwable ie) {
        RxJavaHooks.onError(ie);
        Thread thread = Thread.currentThread();
        thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
    }

    @Override
    public boolean isUnsubscribed(a) {
        return cancel.isUnsubscribed();
    }

    @Override
    public void unsubscribe(a) {
        if (!cancel.isUnsubscribed()) {
            cancel.unsubscribe();
        }
    }

    /**
     * Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
     * if the underlying {@code action} completes or the this scheduled action is cancelled.
     *
     * @param s the Subscription to add
     */
    public void add(Subscription s) {
        cancel.add(s);
    }

    /**
     * Adds the given Future to the unsubscription composite in order to support
     * cancelling the underlying task in the executor framework.
     * @param f the future to add
     */
    public void add(finalFuture<? > f) {
        cancel.add(new FutureCompleter(f));
    }

    /**
     * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
     * cancelled or terminates, it can remove itself from this parent.
     *
     * @param parent
     *            the parent {@code CompositeSubscription} to add
     */
    public void addParent(CompositeSubscription parent) {
        cancel.add(new Remover(this, parent));
    }

    /**
     * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
     * cancelled or terminates, it can remove itself from this parent.
     *
     * @param parent
     *            the parent {@code CompositeSubscription} to add
     */
    public void addParent(SubscriptionList parent) {
        cancel.add(new Remover2(this, parent));
    }

    /** * Cancels the captured future if the caller of the call method * is not the same as the runner of the outer ScheduledAction to * prevent unnecessary self-interrupting if the unsubscription * happens from the same thread. */
    final class FutureCompleter implements Subscription {
        private finalFuture<? > f; FutureCompleter(Future<? > f) {this.f = f;
        }

        @Override
        public void unsubscribe(a) {
            if (ScheduledAction.this.get() ! = Thread.currentThread()) { f.cancel(true);
            } else {
                f.cancel(false); }}@Override
        public boolean isUnsubscribed(a) {
            returnf.isCancelled(); }}/** Remove a child subscription from a composite when unsubscribing. */
    static final class Remover extends AtomicBoolean implements Subscription {
        / * * * /
        private static final long serialVersionUID = 247232374289553518L;
        final ScheduledAction s;
        final CompositeSubscription parent;

        public Remover(ScheduledAction s, CompositeSubscription parent) {
            this.s = s;
            this.parent = parent;
        }

        @Override
        public boolean isUnsubscribed(a) {
            return s.isUnsubscribed();
        }

        @Override
        public void unsubscribe(a) {
            if (compareAndSet(false.true)) { parent.remove(s); }}}/** Remove a child subscription from a composite when unsubscribing. */
    static final class Remover2 extends AtomicBoolean implements Subscription {
        / * * * /
        private static final long serialVersionUID = 247232374289553518L;
        final ScheduledAction s;
        final SubscriptionList parent;

        public Remover2(ScheduledAction s, SubscriptionList parent) {
            this.s = s;
            this.parent = parent;
        }

        @Override
        public boolean isUnsubscribed(a) {
            return s.isUnsubscribed();
        }

        @Override
        public void unsubscribe(a) {
            if (compareAndSet(false.true)) { parent.remove(s); }}}}Copy the code
RxJava1: Scheduler in RxAndroid

Handler and Looper are used to execute on the main thread


/** Android-specific Schedulers. */
public final class AndroidSchedulers {
    private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();

    private final Scheduler mainThreadScheduler;

    private static AndroidSchedulers getInstance(a) {
        for (;;) {
            AndroidSchedulers current = INSTANCE.get();
            if(current ! =null) {
                return current;
            }
            current = new AndroidSchedulers();
            if (INSTANCE.compareAndSet(null, current)) {
                returncurrent; }}}private AndroidSchedulers(a) {
        RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

        Scheduler main = hook.getMainThreadScheduler();
        if(main ! =null) {
            mainThreadScheduler = main;
        } else {
            mainThreadScheduler = newLooperScheduler(Looper.getMainLooper()); }}/** A {@link Scheduler} which executes actions on the Android UI thread. */
    public static Scheduler mainThread(a) {
        return getInstance().mainThreadScheduler;
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new LooperScheduler(looper);
    }

    /**
     * Resets the current {@link AndroidSchedulers} instance.
     * This will re-init the cached schedulers on the next usage,
     * which can be useful in testing.
     */
    @Experimental
    public static void reset(a) {
        INSTANCE.set(null); }}Copy the code

支那

class LooperScheduler extends Scheduler { private final Handler handler; LooperScheduler(Looper looper) { handler = new Handler(looper); } LooperScheduler(Handler handler) { this.handler = handler; } @Override public Worker createWorker() { return new HandlerWorker(handler); } static class HandlerWorker extends Worker { private final Handler handler; private final RxAndroidSchedulersHook hook; private volatile boolean unsubscribed; HandlerWorker(Handler handler) { this.handler = handler; this.hook = RxAndroidPlugins.getInstance().getSchedulersHook(); } @Override public void unsubscribe() { unsubscribed = true; handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isUnsubscribed() { return unsubscribed; } @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (unsubscribed) { return Subscriptions.unsubscribed(); } action = hook.onSchedule(action); ScheduledAction scheduledAction = new ScheduledAction(action, handler); Message message = Message.obtain(handler, scheduledAction); message.obj = this; // Used as token for unsubscription operation. handler.sendMessageDelayed(message, unit.toMillis(delayTime)); if (unsubscribed) { handler.removeCallbacks(scheduledAction); return Subscriptions.unsubscribed(); } return scheduledAction; } @Override public Subscription schedule(final Action0 action) { return schedule(action, 0, TimeUnit.MILLISECONDS); } } static final class ScheduledAction implements Runnable, Subscription { private final Action0 action; private final Handler handler; private volatile boolean unsubscribed; ScheduledAction(Action0 action, Handler handler) { this.action = action; this.handler = handler; } @Override public void run() { try { action.call(); } catch (Throwable e) { // nothing to do but print a System error as this is fatal and there is nowhere else to throw this IllegalStateException ie; if (e instanceof OnErrorNotImplementedException) { ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); } else { ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); } RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } } @Override public void unsubscribe() { unsubscribed = true; handler.removeCallbacks(this); } @Override public boolean isUnsubscribed() { return unsubscribed; }}}Copy the code

4. Source code analysis of RxJava2 Scheduler


public abstract class Scheduler {
  
    static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
    static {
        CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
                Long.getLong("rx2.scheduler.drift-tolerance".15));
    }
   
    public static long clockDriftTolerance(a) {
        return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
    }

    
    @NonNull
    public abstract io.reactivex.Scheduler.Worker createWorker(a);
    
    public long now(@NonNull TimeUnit unit) {
        return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    
    public void start(a) {}public void shutdown(a) {}@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

 
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final io.reactivex.Scheduler.Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        io.reactivex.Scheduler.DisposeTask task = new io.reactivex.Scheduler.DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final io.reactivex.Scheduler.Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        io.reactivex.Scheduler.PeriodicDirectTask periodicTask = new io.reactivex.Scheduler.PeriodicDirectTask(decoratedRun, w);

        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }

        return periodicTask;
    }

  
    @SuppressWarnings("unchecked")
    @NonNull
    public <S extends io.reactivex.Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
        return (S) new SchedulerWhen(combine, this);
    }

   
    public abstract static class Worker implements Disposable {
        /**
         * Schedules a Runnable for execution without any time delay.
         *
         * <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
         *
         * @param run
         *            Runnable to schedule
         * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
         */
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

        /**
         * Schedules an Runnable for execution at some point in the future specified by a time delay
         * relative to the current time.
         * <p>
         * Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
         * as if the {@link #schedule(Runnable)} was called.
         *
         * @param run
         *            the Runnable to schedule
         * @param delay
         *            time to "wait" before executing the action; non-positive values indicate an non-delayed
         *            schedule
         * @param unit
         *            the time unit of {@code delayTime}
         * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
         */
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

 
        @NonNull
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            final SequentialDisposable first = new SequentialDisposable();

            final SequentialDisposable sd = new SequentialDisposable(first);

            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            final long periodInNanoseconds = unit.toNanos(period);
            final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
            final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

            Disposable d = schedule(new io.reactivex.Scheduler.Worker.PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                    periodInNanoseconds), initialDelay, unit);

            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
            first.replace(d);

            return sd;
        }

        /**
         * Returns the 'current time' of the Worker in the specified time unit.
         * @param unit the time unit
         * @return the 'current time'
         * @since2.0 * /
        public long now(@NonNull TimeUnit unit) {
            return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        /** * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */
        final class PeriodicTask implements Runnable.SchedulerRunnableIntrospection {
            @NonNull
            final Runnable decoratedRun;
            @NonNull
            final SequentialDisposable sd;
            final long periodInNanoseconds;
            long count;
            long lastNowNanoseconds;
            long startInNanoseconds;

            PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
                         long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
                this.decoratedRun = decoratedRun;
                this.sd = sd;
                this.periodInNanoseconds = periodInNanoseconds;
                lastNowNanoseconds = firstNowNanoseconds;
                startInNanoseconds = firstStartInNanoseconds;
            }

            @Override
            public void run(a) {
                decoratedRun.run();

                if(! sd.isDisposed()) {long nextTick;

                    long nowNanoseconds = now(TimeUnit.NANOSECONDS);
                    // If the clock moved in a direction quite a bit, rebase the repetition period
                    if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                            || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                        nextTick = nowNanoseconds + periodInNanoseconds;
                        /* * Shift the start point back by the drift as if the whole thing * started count periods ago. */
                        startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
                    } else {
                        nextTick = startInNanoseconds + (++count * periodInNanoseconds);
                    }
                    lastNowNanoseconds = nowNanoseconds;

                    long delay = nextTick - nowNanoseconds;
                    sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); }}@Override
            public Runnable getWrappedRunnable(a) {
                return this.decoratedRun; }}}static final class PeriodicDirectTask
            implements Disposable.Runnable.SchedulerRunnableIntrospection {

        @NonNull
        final Runnable run;

        @NonNull
        final io.reactivex.Scheduler.Worker worker;

        volatile boolean disposed;

        PeriodicDirectTask(@NonNull Runnable run, @NonNull io.reactivex.Scheduler.Worker worker) {
            this.run = run;
            this.worker = worker;
        }

        @Override
        public void run(a) {
            if(! disposed) {try {
                    run.run();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    worker.dispose();
                    throwExceptionHelper.wrapOrThrow(ex); }}}@Override
        public void dispose(a) {
            disposed = true;
            worker.dispose();
        }

        @Override
        public boolean isDisposed(a) {
            return disposed;
        }

        @Override
        public Runnable getWrappedRunnable(a) {
            returnrun; }}static final class DisposeTask implements Disposable.Runnable.SchedulerRunnableIntrospection {

        @NonNull
        final Runnable decoratedRun;

        @NonNull
        final io.reactivex.Scheduler.Worker w;

        @Nullable
        Thread runner;

        DisposeTask(@NonNull Runnable decoratedRun, @NonNull io.reactivex.Scheduler.Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run(a) {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null; }}@Override
        public void dispose(a) {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else{ w.dispose(); }}@Override
        public boolean isDisposed(a) {
            return w.isDisposed();
        }

        @Override
        public Runnable getWrappedRunnable(a) {
            return this.decoratedRun; }}}Copy the code
  1. Different schedulers are passed in to use different threads
  2. Scheduler creates workers to use a real thread pool

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0.null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    /**
     * Schedules the given runnable on the underlying executor directly and
     * returns its future wrapped into a Disposable.
     * @param run the Runnable to execute in a delayed fashion
     * @param delayTime the delay amount
     * @param unit the delay time unit
     * @return the ScheduledRunnable instance
     */
    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try{ Future<? > f;if (delayTime <= 0L) {
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            returnEmptyDisposable.INSTANCE; }}/**
     * Schedules the given runnable periodically on the underlying executor directly
     * and returns its future wrapped into a Disposable.
     * @param run the Runnable to execute in a periodic fashion
     * @param initialDelay the initial delay amount
     * @param period the repeat period amount
     * @param unit the time unit for both the initialDelay and period
     * @return the ScheduledRunnable instance
     */
    public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        if (period <= 0L) {

            InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
            try{ Future<? > f;if (initialDelay <= 0L) {
                    f = executor.submit(periodicWrapper);
                } else {
                    f = executor.schedule(periodicWrapper, initialDelay, unit);
                }
                periodicWrapper.setFirst(f);
            } catch (RejectedExecutionException ex) {
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }

            return periodicWrapper;
        }
        ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
        try{ Future<? > f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); task.setFuture(f);return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            returnEmptyDisposable.INSTANCE; }}/** * Wraps the given runnable into a ScheduledRunnable and schedules it * on the underlying ScheduledExecutorService. *  <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return * false. *@param run the runnable instance
     * @param delayTime the time to delay the execution
     * @param unit the time unit
     * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
     * @return the ScheduledRunnable instance
     */
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if(parent ! =null) {
            if(! parent.add(sr)) {returnsr; } } Future<? > f;try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if(parent ! =null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

    @Override
    public void dispose(a) {
        if(! disposed) { disposed =true; executor.shutdownNow(); }}/** * Shuts down the underlying executor in a non-interrupting fashion. */
    public void shutdown(a) {
        if(! disposed) { disposed =true; executor.shutdown(); }}@Override
    public boolean isDisposed(a) {
        returndisposed; }}Copy the code
  1. Pass in the concrete operation Runnable
  2. Schedule is implemented through the schedule method
RxJava2: Scheduler in RxAndroid

Similar to RxJava1, Handler and Looper are used to implement execution on the main thread


public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call(a) throws Exception {
                    returnMainHolder.DEFAULT; }});/** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread(a) {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers(a) {
        throw new AssertionError("No instances."); }}Copy the code

支那

final class HandlerScheduler extends Scheduler { private final Handler handler; HandlerScheduler(Handler handler) { this.handler = handler; } @Override public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } @Override public void dispose() { disposed = true; handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isDisposed() { return disposed; } } private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } } @Override public void dispose() { disposed = true; handler.removeCallbacks(this); } @Override public boolean isDisposed() { return disposed; }}}Copy the code

5.RxJava1 Scheduler threads transform and copy

Main objects:
Switcher Thread Switcher
  1. For thread switching
  2. There is a createWorker method

/** * Created by Xionghu on 2018/6/14. * Desc: Created by Xionghu on 2018/6/14

public abstract  class Switcher {
    public abstract Worker createWorker(a);

    public static abstract class Worker implements Calling{
        public abstract Calling switches(Action0 action0); }}Copy the code
Worker
  1. Classes that actually perform thread transformations
  2. Perform transformations through the switches method
NewThreadSwitcher
  1. Switcher to switch to a new thread
  2. Implement the createWorker method

/** * Created by Xionghu on 2018/6/14. * Desc: Switcher for new threads */

public class NewThreadSwitcher extends Switcher {
    @Override
    public Worker createWorker(a) {
        return newNewThreadWorker(); }}Copy the code
NewThreadWorker
  1. There is a thread pool with only one thread
  2. Implement the switches method to switch threads
  3. Throw the actual operation into the thread pool with a Runnable wrapper

/** * Created by Xionghu on 2018/6/14. * Desc: new thread work class */

public class NewThreadWorker extends Switcher.Worker {

    //newScheduledThreadPool: creates a thread pool of unlimited size. This thread pool supports the need to execute tasks regularly and periodically.
    private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
        @Override
        public Thread newThread(@NonNull Runnable runnable) {
            return new Thread(runnable, " NewThreadWorker"); }});private volatile boolean mIsUnCalled;

    @Override
    public void unCall(a) {
        mIsUnCalled = true;
    }

    @Override
    public boolean isUnCalled(a) {
        return mIsUnCalled;
    }

    @Override
    public Calling switches(Action0 action0) {
        SwitcherAction switcherAction = new SwitcherAction(action0);
        mExecutor.submit(switcherAction);
        return switcherAction;
    }

    private static class SwitcherAction implements Runnable.Calling {
        private final Action0 action0;
        private volatile boolean mIsUnCalled;

        public SwitcherAction(Action0 action0) {
            this.action0 = action0;
        }

        @Override
        public void unCall(a) {
            mIsUnCalled = true;
        }

        @Override
        public boolean isUnCalled(a) {
            return mIsUnCalled;
        }

        @Override
        public void run(a) { action0.call(); }}}Copy the code
LooperSwitcher

Switch to a thread's Looper in Android/** * Created by Xionghu on 2018/6/14. * Desc: Looper Switcher for Android */

public class LooperSwitcher extends Switcher {

    private Handler mHandler;

    public LooperSwitcher(Looper looper) {
        mHandler = new Handler(looper);
    }

    @Override
    public Worker createWorker(a) {
        return newHandlerWorker(mHandler); }}Copy the code
HandlerWorker

Sends the action to the specified Looper for execution

支那

import android.os.Handler; import android.os.Message; /** * Created by Xionghu on 2018/6/14. * Desc: Worker */ public class HandlerWorker extends Switcher.Worker {private final Handler mHandler; private volatile boolean mIsUnCalled; public HandlerWorker(Handler mHandler) { this.mHandler = mHandler; } @Override public void unCall() { mIsUnCalled = true; mHandler.removeCallbacksAndMessages(this); } @Override public boolean isUnCalled() { return mIsUnCalled; } @Override public Calling switches(Action0 action0) { SwitcherAction switcherAction = new SwitcherAction(action0, mHandler); Message message = Message.obtain(mHandler, switcherAction); message.obj = this; mHandler.sendMessage(message); return switcherAction; } private static class SwitcherAction implements Runnable, Calling { private final Action0 action0; private final Handler handler; private volatile boolean mIsUnCalled; public SwitcherAction(Action0 action0, Handler handler) { this.action0 = action0; this.handler = handler; } @Override public void unCall() { mIsUnCalled = true; handler.removeCallbacks(this); } @Override public boolean isUnCalled() { return mIsUnCalled; } @Override public void run() { action0.call(); }}}Copy the code

6. The RxJava2 Scheduler threads change to copy data

Switcher Thread Switcher
  1. For thread switching
  2. There is a createWorker method
  3. Contains a switches method itself (unlike RxJava1)

/** * Created by Xionghu on 2018/6/14. * Desc: Abstract class for thread switching */

public abstract class Switcher {

    public abstract Worker createWorker(a);

    public Release switches(final Runnable runnable) {
        Worker worker = createWorker();
        worker.switches(new Runnable() {
            @Override
            public void run(a) { runnable.run(); }});return worker;
    }

    public static abstract class Worker implements Release {
        public abstract Release switches(Runnable runnable); }}Copy the code
Worker
  1. Classes that actually perform thread transformations
  2. Perform transformations through the switches method
NewThreadSwitcher
  1. Switcher to switch to a new thread
  2. Implement the createWorker method

/** * Created by Xionghu on 2018/6/14. * Desc: Switcher for new threads */

public class NewThreadSwitcher extends Switcher {
    @Override
    public Worker createWorker(a) {
        return newNewThreadWorker(); }}Copy the code
NewThreadWorker
  1. There is a thread pool with only one thread
  2. Implement the switches method to switch threads
  3. Throw the actual operation into the thread pool with a Runnable wrapper

import android.support.annotation.NonNull;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/** * Created by Xionghu on 2018/6/14. * Desc: new thread work class */

public class NewThreadWorker extends Switcher.Worker {
    private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
        @Override
        public Thread newThread(@NonNull Runnable runnable) {
            return new Thread(runnable, "NewThreadWorker"); }});private volatile boolean mIsReleased;

    @Override
    public boolean isReleased(a) {
        return mIsReleased;
    }

    @Override
    public void release(a) {
        mIsReleased = true;
    }

    @Override
    public Release switches(Runnable runnable) {
        SwitcherAction switcherAction = new SwitcherAction(runnable);
        mExecutor.submit((Callable<Object>) switcherAction);
        return switcherAction;
    }

    private static class SwitcherAction implements Runnable.Callable<Object>, Release {

        private final Runnable mRunnable;

        private volatile boolean mIsReleased;

        public SwitcherAction(Runnable mRunnable) {
            this.mRunnable = mRunnable;
        }

        @Override
        public boolean isReleased(a) {
            return mIsReleased;
        }

        @Override
        public void release(a) {
            mIsReleased = true;
        }

        @Override
        public void run(a) {
            mRunnable.run();
        }

        @Override
        public Object call(a) throws Exception {
            run();
            return null; }}}Copy the code
LooperSwitcher

Switch to a thread’s Looper in Android

HandlerWorker

Sends the action to the specified Looper for execution

7. Principle analysis of RxJava1 subscribeOn

  1. We do it with OnSubscribe
  2. Scheduler is used to place emitted actions into a thread for execution

    public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }
Copy the code

支那

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; final boolean requestOn; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; this.requestOn = requestOn; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source); subscriber.add(parent); subscriber.add(inner); inner.schedule(parent); } static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> actual; final boolean requestOn; final Worker worker; Observable<T> source; Thread t; SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) { this.actual = actual; this.requestOn = requestOn; this.worker = worker; this.source = source; } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable e) { try { actual.onError(e); } finally { worker.unsubscribe(); } } @Override public void onCompleted() { try { actual.onCompleted(); } finally { worker.unsubscribe(); } } @Override public void call() { Observable<T> src = source; source = null; t = Thread.currentThread(); src.unsafeSubscribe(this); } @Override public void setProducer(final Producer p) { actual.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread() || ! requestOn) { p.request(n); } else { worker.schedule(new Action0() { @Override public void call() { p.request(n); }}); }}}); }}Copy the code

A proxy mechanism is used

8. Analysis of RxJava2 subscribeOn principle

8.1.RxJava2(without back pressure) subscribeOn

支那

@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler)  { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }Copy the code

支那

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } } final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); }}}Copy the code
  1. Inheritance AbstractObservableWithUpstream
  2. Implement the subscribeActual method
  3. Scheduler is used to place the send action into the thread for execution
8.2.RxJava2(with back pressure) subscribeOn

支那

  @CheckReturnValue
    @BackpressureSupport(BackpressureKind.PASS_THROUGH)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @Experimental
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }
Copy the code

支那

public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> { final Scheduler scheduler; final boolean nonScheduledRequests; public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) { super(source); this.scheduler = scheduler; this.nonScheduledRequests = nonScheduledRequests; } @Override public void subscribeActual(final Subscriber<? super T> s) { Scheduler.Worker w = scheduler.createWorker(); final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests); s.onSubscribe(sos); w.schedule(sos); } static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable { private static final long serialVersionUID = 8094547886072529208L; final Subscriber<? super T> actual; final Scheduler.Worker worker; final AtomicReference<Subscription> s; final AtomicLong requested; final boolean nonScheduledRequests; Publisher<T> source; SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) { this.actual = actual; this.worker = worker; this.source = source; this.s = new AtomicReference<Subscription>(); this.requested = new AtomicLong(); this.nonScheduledRequests = ! requestOn; } @Override public void run() { lazySet(Thread.currentThread()); Publisher<T> src = source; source = null; src.subscribe(this); } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this.s, s)) { long r = requested.getAndSet(0L); if (r ! = 0L) { requestUpstream(r, s); } } } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); worker.dispose(); } @Override public void onComplete() { actual.onComplete(); worker.dispose(); } @Override public void request(final long n) { if (SubscriptionHelper.validate(n)) { Subscription s = this.s.get(); if (s ! = null) { requestUpstream(n, s); } else { BackpressureHelper.add(requested, n); s = this.s.get(); if (s ! = null) { long r = requested.getAndSet(0L); if (r ! = 0L) { requestUpstream(r, s); } } } } } void requestUpstream(final long n, final Subscription s) { if (nonScheduledRequests || Thread.currentThread() == get()) { s.request(n); } else { worker.schedule(new Request(s, n)); } } @Override public void cancel() { SubscriptionHelper.cancel(s); worker.dispose(); } static final class Request implements Runnable { private final Subscription s; private final long n; Request(Subscription s, long n) { this.s = s; this.n = n; } @Override public void run() { s.request(n); }}}}Copy the code
  1. Inheritance AbstractFlowableWithUpstream
  2. Implement the subscribeActual method
  3. Scheduler is used to place emitted actions into a thread for execution

9. RxJava1 subscribeOn copy

支那

    public final Caller<T> callOn(Switcher switcher) {
        return create(new OperatorCallOn<>(switcher, this));
    }
Copy the code

支那

/** * Created by Xionghu on 2018/6/14. * Desc: OnCall */ Public Class OperatorCallOn<T> implements caller. OnCall<T> {private Final Switcher Switcher; private final Caller<T> tCaller; public OperatorCallOn(Switcher switcher, Caller<T> tCaller) { this.switcher = switcher; this.tCaller = tCaller; } @Override public void call(final Receiver<T> tReceiver) { Switcher.Worker worker = switcher.createWorker(); worker.switches(new Action0() { @Override public void call() { Receiver<T> tReceiver1 = new Receiver<T>() { @Override public void onCompleted() { tReceiver.onCompleted(); } @Override public void onError(Throwable t) { tReceiver.onError(t); } @Override public void onReceive(T t) { tReceiver.onReceive(t); }}; tCaller.call(tReceiver1); }}); }}Copy the code
OnCall for callOn
  1. Has the original Caller and Switcher
  2. Create a new Receiver to wrap the old one into the thread
run

支那

import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import com.haocai.mylibrary.rxJava1.Caller; import com.haocai.mylibrary.rxJava1.NewThreadSwitcher; import com.haocai.mylibrary.rxJava1.Receiver; import com.haocai.rxjavademo.R; import butterknife.ButterKnife; import butterknife.OnClick; /** * Created by Xionghu on 2018/6/11. * Desc: .rxJavA1 subscribeOn copy */ public class Lesson3_2Activity extends AppCompatActivity {public static final String TAG = "kpioneer"; @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Caller. create(new Caller.OnCall<String>() { @Override public void  call(Receiver<String> stringReceiver) { if (! stringReceiver.isUnCalled()) { stringReceiver.onReceive("test"); stringReceiver.onCompleted(); } } }). callOn(new NewThreadSwitcher()). call(new Receiver<String>() { @Override public void onCompleted() { } @Override  public void onError(Throwable t) { } @Override public void onReceive(String o) { Log.d(TAG, "onReceive:" + o); Log.d(TAG, "currentThread:" + Thread.currentThread()); }}); }}Copy the code

支那

06-15 14:19:02. 219, 17153-17366 / com. Haocai. Rxjavademo D/kpioneer: OnReceive: 06-15 14:19:02 test. 219, 17153-17366 / com. Haocai. Rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]Copy the code

10. RxJava2 subscribeOn copy

10.1RxJava2(Without back Pressure)

支那

   public Caller<T> callOn(Switcher switcher) {
        return new CallerCallOn<>(this, switcher);
    }
Copy the code

支那

/** * Created by Xionghu on 2018/6/15. * Desc: Public class extends CallerWithUpstream<T, T> {private Switcher mSwitcher; public CallerCallOn(Caller<T> source, Switcher mSwitcher) { super(source); this.mSwitcher = mSwitcher; } @Override protected void callActual(Callee<T> callee) { final CallOnCallee<T> tCallOnCallee = new CallOnCallee<>(callee); callee.onCall(tCallOnCallee); mSwitcher.switches(new Runnable() { @Override public void run() { source.call(tCallOnCallee); }}); } private static final class CallOnCallee<T> implements Callee<T>, Release { private final Callee<T> callee; public CallOnCallee(Callee<T> callee) { this.callee = callee; } @Override public void onCall(Release release) { } @Override public void onReceive(T t) { callee.onReceive(t); } @Override public void onCompleted() { callee.onCompleted(); } @Override public void onError(Throwable t) { callee.onError(t); } @Override public boolean isReleased() { return false; } @Override public void release() { } } }Copy the code
CallerCallOn
  1. Inherited from CallerWithUpstream
  2. Has the original Caller and Switcher
  3. Create a new Callee wrapped around the old one and drop it into the thread
10.2RxJava2(with back pressure)

支那

    public Telephoner<T> callOn(Switcher switcher) {
        return new TelephonerCallOn<>(this, switcher);
    }
Copy the code

支那

import com.haocai.mylibrary.rxJava2.Switcher; import java.util.concurrent.atomic.AtomicLong; /** * Created by Xionghu on 2018/6/15. * Desc: extends public class TelephonerCallOn<T TelephonerWithUpstream<T, T> { private final Switcher mSwitcher; public TelephonerCallOn(Telephoner<T> source, Switcher switcher) { super(source); mSwitcher = switcher; } @Override protected void callActual(Receiver<T> receiver) { final CallOnReceiver<T> tCallOnReceiver = new CallOnReceiver<>(receiver); receiver.onCall(tCallOnReceiver); mSwitcher.switches(new Runnable() { @Override public void run() { source.call(tCallOnReceiver); }}); } private static final class CallOnReceiver<T> extends AtomicLong implements Receiver<T>, Drop { private final Receiver<T> mReceiver; public CallOnReceiver(Receiver<T> receiver) { mReceiver = receiver; } @Override public void request(long n) { BackpressureHelper.add(this, n); } @Override public void drop() { } @Override public void onCall(Drop d) { mReceiver.onCall(d); } @Override public void onReceive(T t) { if (get() ! = 0) { mReceiver.onReceive(t); BackpressureHelper.produced(this, 1); } } @Override public void onError(Throwable t) { mReceiver.onError(t); } @Override public void onCompleted() { mReceiver.onCompleted(); }}}Copy the code
TelephonerCallOn
  1. Inherited from TelephonerWithUpstream
  2. Hold the original Telephoner and Switcher
  3. Create a new Receiver to wrap the old one into the thread
10.3 run

支那

/** * Created by Xionghu on 2018/6/11. * Desc: .rxJavA2 subscribeOn copy */ public Class Lesson3_3Activity extends AppCompatActivity {public static final String TAG = "kpioneer"; @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @ OnClick (R.i which estDo) public void onViewClicked () {/ * -- -- -- -- -- -- -- -- -- no back pressure -- -- -- -- -- -- -- -- - * / Caller. Create (new CallerOnCall<String>() { @Override public void call(CallerEmitter<String> callerEmitter) { callerEmitter.onReceive("test"); callerEmitter.onCompleted(); } }). callOn(new NewThreadSwitcher()). call(new Callee<String>() { @Override public void onCall(Release release) { } @override public void onReceive(String String) {log. d(TAG, "no back pressure: onReceive:" + String); Log.d(TAG, "no back pressure: currentThread:" + thread.currentThread ()); } @Override public void onCompleted() { } @Override public void onError(Throwable t) { } }); / * -- -- -- -- -- -- -- -- -- a back pressure -- -- -- -- -- -- -- -- - * / Telephoner. Create (new TelephonerOnCall < String > () {@ Override public void call(TelephonerEmitter<String> telephonerEmitter) { telephonerEmitter.onReceive("test"); telephonerEmitter.onCompleted(); } }). callOn(new NewThreadSwitcher()). call(new Receiver<String>() { @Override public void onCall(Drop d) { d.request(Long.MAX_VALUE); } @override public void onReceive(String s) {log. d(TAG, "backpressure: onReceive:" + s); Log.d(TAG, "backpressure: currentThread:" + thread.currentThread ()); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { } }); }}Copy the code
06-15 16:13:27. 002, 813-1150 / com. Haocai. Rxjavademo D/kpioneer: No back pressure: onReceive: test of 06-15 16:13:27. 003, 813-1150 / com. Haocai. Rxjavademo D/kpioneer: No back pressure: currentThread: Thread [NewThreadWorker, 5, the main] 6-15 16:13:27. 011, 813-1151 / com. Haocai. Rxjavademo D/kpioneer: A back pressure: onReceive: test of 06-15 16:13:27. 011, 813-1151 / com. Haocai. Rxjavademo D/kpioneer: CurrentThread :Thread[NewThreadWorker,5,main]Copy the code

11 Mechanism Analysis of RxJava1 observeOn

支那

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }
Copy the code

支那

public final class OperatorObserveOn<T> implements Operator<T, T> {

    private final Scheduler scheduler;
    private final boolean delayError;
    private final int bufferSize;

    /**
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
     */
    public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
        this(scheduler, delayError, RxRingBuffer.SIZE);
    }

    /**
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
     * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
     */
    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    public static <T> Operator<T, T> rebatch(final int n) {
        return new Operator<T, T>() {
            @Override
            public Subscriber<? super T> call(Subscriber<? super T> child) {
                ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
                parent.init();
                return parent;
            }
        };
    }

    /** Observe through individual queue per observer. */
    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final boolean delayError;
        final Queue<Object> queue;
        /** The emission threshold that should trigger a replenishing request. */
        final int limit;

        // the status of the current stream
        volatile boolean finished;

        final AtomicLong requested = new AtomicLong();

        final AtomicLong counter = new AtomicLong();

        /**
         * The single exception if not null, should be written before setting finished (release) and read after
         * reading finished (acquire).
         */
        Throwable error;

        /** Remembers how many elements have been emitted before the requests run out. */
        long emitted;

        // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
        // not prevent anything downstream from consuming, which will happen if the Subscription is chained
        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
            // this formula calculates the 75% of the bufferSize, rounded up to the next integer
            this.limit = calculatedSize - (calculatedSize >> 2);
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(calculatedSize);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
            }
            // signal that this is an async operator capable of receiving this many
            request(calculatedSize);
        }

        void init() {
            // don't want this code in the constructor because `this` can escape through the
            // setProducer call
            Subscriber<? super T> localChild = child;

            localChild.setProducer(new Producer() {

                @Override
                public void request(long n) {
                    if (n > 0L) {
                        BackpressureUtils.getAndAddRequest(requested, n);
                        schedule();
                    }
                }

            });
            localChild.add(recursiveScheduler);
            localChild.add(this);
        }

        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(NotificationLite.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            }
            error = e;
            finished = true;
            schedule();
        }

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }

        // only execute this from schedule()
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;

            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;

            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)

            for (;;) {
                long requestAmount = requested.get();

                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;

                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    localChild.onNext(NotificationLite.<T>getValue(v));

                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                    }
                }

                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }

                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
        }

        boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
            if (a.isUnsubscribed()) {
                q.clear();
                return true;
            }

            if (done) {
                if (delayError) {
                    if (isEmpty) {
                        Throwable e = error;
                        try {
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onCompleted();
                            }
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                    }
                } else {
                    Throwable e = error;
                    if (e != null) {
                        q.clear();
                        try {
                            a.onError(e);
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                        return true;
                    } else
                    if (isEmpty) {
                        try {
                            a.onCompleted();
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                        return true;
                    }
                }

            }

            return false;
        }
    }
}
Copy the code
Operator for observeOn
  1. Is the Operator observeOn
  2. Transform the Operator with lift
  3. Return a Subscriber for observeOn in Operator
Subscriber for observeOn

Throw it into a thread when a method such as onNext is called