preface
Scheduler embodies the idea of responsive programming: changes are implemented through Scheduler and can be propagated downward. (Change propagation)
Thread transformation function module:
- Allow code to be executed on different threads
- SubscribeOn – The thread when subscribing
- ObserveOn – The thread that receives it
- Scheduler – Actually does thread transitions
1.RxJava1 thread transformation
- The Scheduler transfers
- Operator Operator interface
- The lift core operator
The instance
Observable.
create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if(! subscriber.isUnsubscribed()) { Log.d(TAG,"currentThread:" + Thread.currentThread());
subscriber.onNext("test");
subscriber.onCompleted();
}
}
}).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {}@Override
public void onNext(String s) {
Log.d(TAG, "onNext:" + s + "currentThread:"+ Thread.currentThread()); }});Copy the code
run
6-12 17:00:13. 846, 6227-6495 / com. Haocai. Rxjavademo D/kpioneer: CurrentThread: Thread [RxNewThreadScheduler - 1, 5, the main] 6-12 17:00:13. 856, 6227-6227 / com. Haocai. Rxjavademo D/kpioneer: onNext:testcurrentThread:Thread[main,5,main]Copy the code
2.RxJava2 thread transformation
- The Scheduler transfers
- AbstractObservableWithUpStream abstract class
/*--------- no back pressure ---------*/
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if(! emitter.isDisposed()) { Log.d(TAG,"Observable currentThread:" + Thread.currentThread());
emitter.onNext("test");
emitter.onComplete();
}
}
}).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onNext(String o) {
Log.d(TAG, "Observable onNext:" + o);
Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {}@Override
public void onComplete(a) {}});/*--------- has back pressure ---------*/
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
if(! emitter.isCancelled()) { Log.d(TAG,"Flowable currentThread:" + Thread.currentThread());
emitter.onNext("test"); emitter.onComplete(); }}}, BackpressureStrategy.DROP). subscribeOn(Schedulers.newThread()). observeOn(AndroidSchedulers.mainThread()). subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d(TAG, "Flowable onNext:" + s);
Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {}@Override
public void onComplete(a) {}});Copy the code
6-13 13:37:13. 009. 3063-3949 / com haocai. Rxjavademo D/kpioneer: Observables currentThread: Thread [RxNewThreadScheduler - 1, 5, the main] 6-13 13:37:13. 019, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: observables onNext: the test of 06-13 13:37:13. 019, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: Observables currentThread: Thread [5, the main, the main] 6-13 13:37:13. 019, 3063-3950 / com haocai. Rxjavademo D/kpioneer: Flowable currentThread: Thread [RxNewThreadScheduler - 2, 5, the main] 6-13 13:37:13. 029, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: Flowable onNext: the test of 06-13 13:37:13. 029, 3063-3063 / com. Haocai. Rxjavademo D/kpioneer: Flowable currentThread:Thread[main,5,main]Copy the code
3. Source code analysis of RxJava1 Scheduler
- Scheduler: Abstract class
- Worker: A class that actually does thread scheduling
- Action0: Action performed in the thread
- Schedule: Method of actually scheduling threads with the input parameter Action0
public abstract class Scheduler {
public abstract Worker createWorker(a);
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
*/
public abstract static class Worker implements Subscription {
/**
* Schedules an Action for execution.
*
* @param action
* Action to schedule
* @return a subscription to be able to prevent or cancel the execution of the action
*/
public abstract Subscription schedule(Action0 action);
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
return SchedulePeriodicHelper.schedulePeriodically(this, action,
initialDelay, period, unit, null);
}
public long now(a) {
returnSystem.currentTimeMillis(); }}public long now(a) {
return System.currentTimeMillis();
}
@SuppressWarnings("unchecked")
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this); }}Copy the code
Thread scheduling process:
- Different schedulers are passed to use different threads
public final Observable<T> subscribeOn(Scheduler scheduler) {
returnsubscribeOn(scheduler, ! (this.onSubscribe instanceof OnSubscribeCreate));
}
Copy the code
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
Copy the code
- Scheduler creates workers to use a real thread pool
Create a thread pool in NewThreadWorker
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
volatile boolean isUnsubscribed;
/** The purge frequency in milliseconds. */
private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
private static final AtomicReference<ScheduledExecutorService> PURGE;
/**
* Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}.
* Also, it works even for inheritance: {@link Method} of base class can be invoked on the instance of child class.
*/
private static volatile Object cachedSetRemoveOnCancelPolicyMethod;
/**
* Possible value of {@link #cachedSetRemoveOnCancelPolicyMethod} which means that cancel policy is not supported.
*/
private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
PURGE = new AtomicReference<ScheduledExecutorService>();
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);
// Forces the use of purge even if setRemoveOnCancelPolicy is available
final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);
final int androidApiVersion = PlatformDependent.getAndroidApiVersion();
// According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boo lean)
// setRemoveOnCancelPolicy available since Android API 21SHOULD_TRY_ENABLE_CANCEL_POLICY = ! purgeForce && (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >=21);
}
/**
* Registers the given executor service and starts the purge thread if not already started.
* <p>{@code public} visibility reason: called from other package(s) within RxJava
* @param service a scheduled thread pool executor instance
*/
public static void registerExecutor(ScheduledThreadPoolExecutor service) {
do {
ScheduledExecutorService exec = PURGE.get();
if(exec ! =null) {
break;
}
exec = Executors.newScheduledThreadPool(1.new RxThreadFactory(PURGE_THREAD_PREFIX));
if (PURGE.compareAndSet(null, exec)) {
exec.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
purgeExecutors();
}
}, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
break;
} else{ exec.shutdownNow(); }}while (true);
EXECUTORS.putIfAbsent(service, service);
}
/**
* Deregisters the executor service.
* <p>{@code public} visibility reason: called from other package(s) within RxJava
* @param service a scheduled thread pool executor instance
*/
public static void deregisterExecutor(ScheduledExecutorService service) {
EXECUTORS.remove(service);
}
/** Purges each registered executor and eagerly evicts shutdown executors. */
static void purgeExecutors(a) {
try {
// This prevents map.keySet to compile to a Java 8+ KeySetView return type
// and cause NoSuchMethodError on Java 6-7 runtimes.
Map<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> map = EXECUTORS;
Iterator<ScheduledThreadPoolExecutor> it = map.keySet().iterator();
while (it.hasNext()) {
ScheduledThreadPoolExecutor exec = it.next();
if(! exec.isShutdown()) { exec.purge(); }else{ it.remove(); }}}catch(Throwable t) { Exceptions.throwIfFatal(t); RxJavaHooks.onError(t); }}/**
* Tries to enable the Java 7+ setRemoveOnCancelPolicy.
* <p>{@code public} visibility reason: called from other package(s) within RxJava.
* If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
* be called to enable the backup option of purging the executors.
* @param executor the executor to call setRemoveOnCancelPolicy if available.
* @return true if the policy was successfully enabled
*/
public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
if (SHOULD_TRY_ENABLE_CANCEL_POLICY) { // NOPMD
final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;
Method methodToCall;
if (isInstanceOfScheduledThreadPoolExecutor) {
final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;
if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
return false;
}
if (localSetRemoveOnCancelPolicyMethod == null) { Method method = findSetRemoveOnCancelPolicyMethod(executor); cachedSetRemoveOnCancelPolicyMethod = method ! =null
? method
: SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;
methodToCall = method;
} else{ methodToCall = (Method) localSetRemoveOnCancelPolicyMethod; }}else {
methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
}
if(methodToCall ! =null) {
try {
methodToCall.invoke(executor, true);
return true;
} catch (InvocationTargetException e) {
RxJavaHooks.onError(e);
} catch (IllegalAccessException e) {
RxJavaHooks.onError(e);
} catch(IllegalArgumentException e) { RxJavaHooks.onError(e); }}}return false;
}
/**
* Tries to find {@code "setRemoveOnCancelPolicy(boolean)"} method in the class of passed executor.
*
* @param executor whose class will be used to search for required method.
* @return {@code "setRemoveOnCancelPolicy(boolean)"} {@link Method}
* or {@code null} if required {@link Method} was not found.
*/
static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
// The reason for the loop is to avoid NoSuchMethodException being thrown on JDK 6
// which is more costly than looping through ~70 methods.
for (final Method method : executor.getClass().getMethods()) {
if (method.getName().equals("setRemoveOnCancelPolicy")) {
finalClass<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
returnmethod; }}}return null;
}
/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if(! cancelSupported && execinstanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0.null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
/**
* Schedules the given action by wrapping it into a ScheduledAction on the
* underlying ExecutorService, returning the ScheduledAction.
* @param action the action to wrap and schedule
* @param delayTime the delay in execution
* @param unit the time unit of the delay
* @return the wrapper ScheduledAction
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = newScheduledAction(decoratedAction); Future<? > f;if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = newScheduledAction(decoratedAction, parent); parent.add(run); Future<? > f;if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = newScheduledAction(decoratedAction, parent); parent.add(run); Future<? > f;if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
@Override
public void unsubscribe(a) {
isUnsubscribed = true;
executor.shutdownNow();
deregisterExecutor(executor);
}
@Override
public boolean isUnsubscribed(a) {
returnisUnsubscribed; }}Copy the code
- Pass in the action Action0
- The scheduler method is used to implement scheduling
ScheduledAction in action. The call (); Perform specific operations
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable.Subscription {
/ * * * /
private static final long serialVersionUID = -3962399486978279857L;
final SubscriptionList cancel;
final Action0 action;
public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new SubscriptionList();
}
public ScheduledAction(Action0 action, CompositeSubscription parent) {
this.action = action;
this.cancel = new SubscriptionList(new Remover(this, parent));
}
public ScheduledAction(Action0 action, SubscriptionList parent) {
this.action = action;
this.cancel = new SubscriptionList(new Remover2(this, parent));
}
@Override
public void run(a) {
try {
lazySet(Thread.currentThread());
action.call();
} catch (OnErrorNotImplementedException e) {
signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
} catch (Throwable e) {
signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
} finally{ unsubscribe(); }}void signalError(Throwable ie) {
RxJavaHooks.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
@Override
public boolean isUnsubscribed(a) {
return cancel.isUnsubscribed();
}
@Override
public void unsubscribe(a) {
if (!cancel.isUnsubscribed()) {
cancel.unsubscribe();
}
}
/**
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
* if the underlying {@code action} completes or the this scheduled action is cancelled.
*
* @param s the Subscription to add
*/
public void add(Subscription s) {
cancel.add(s);
}
/**
* Adds the given Future to the unsubscription composite in order to support
* cancelling the underlying task in the executor framework.
* @param f the future to add
*/
public void add(finalFuture<? > f) {
cancel.add(new FutureCompleter(f));
}
/**
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
* cancelled or terminates, it can remove itself from this parent.
*
* @param parent
* the parent {@code CompositeSubscription} to add
*/
public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
}
/**
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
* cancelled or terminates, it can remove itself from this parent.
*
* @param parent
* the parent {@code CompositeSubscription} to add
*/
public void addParent(SubscriptionList parent) {
cancel.add(new Remover2(this, parent));
}
/** * Cancels the captured future if the caller of the call method * is not the same as the runner of the outer ScheduledAction to * prevent unnecessary self-interrupting if the unsubscription * happens from the same thread. */
final class FutureCompleter implements Subscription {
private finalFuture<? > f; FutureCompleter(Future<? > f) {this.f = f;
}
@Override
public void unsubscribe(a) {
if (ScheduledAction.this.get() ! = Thread.currentThread()) { f.cancel(true);
} else {
f.cancel(false); }}@Override
public boolean isUnsubscribed(a) {
returnf.isCancelled(); }}/** Remove a child subscription from a composite when unsubscribing. */
static final class Remover extends AtomicBoolean implements Subscription {
/ * * * /
private static final long serialVersionUID = 247232374289553518L;
final ScheduledAction s;
final CompositeSubscription parent;
public Remover(ScheduledAction s, CompositeSubscription parent) {
this.s = s;
this.parent = parent;
}
@Override
public boolean isUnsubscribed(a) {
return s.isUnsubscribed();
}
@Override
public void unsubscribe(a) {
if (compareAndSet(false.true)) { parent.remove(s); }}}/** Remove a child subscription from a composite when unsubscribing. */
static final class Remover2 extends AtomicBoolean implements Subscription {
/ * * * /
private static final long serialVersionUID = 247232374289553518L;
final ScheduledAction s;
final SubscriptionList parent;
public Remover2(ScheduledAction s, SubscriptionList parent) {
this.s = s;
this.parent = parent;
}
@Override
public boolean isUnsubscribed(a) {
return s.isUnsubscribed();
}
@Override
public void unsubscribe(a) {
if (compareAndSet(false.true)) { parent.remove(s); }}}}Copy the code
RxJava1: Scheduler in RxAndroid
Handler and Looper are used to execute on the main thread
/** Android-specific Schedulers. */
public final class AndroidSchedulers {
private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();
private final Scheduler mainThreadScheduler;
private static AndroidSchedulers getInstance(a) {
for (;;) {
AndroidSchedulers current = INSTANCE.get();
if(current ! =null) {
return current;
}
current = new AndroidSchedulers();
if (INSTANCE.compareAndSet(null, current)) {
returncurrent; }}}private AndroidSchedulers(a) {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
Scheduler main = hook.getMainThreadScheduler();
if(main ! =null) {
mainThreadScheduler = main;
} else {
mainThreadScheduler = newLooperScheduler(Looper.getMainLooper()); }}/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread(a) {
return getInstance().mainThreadScheduler;
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new LooperScheduler(looper);
}
/**
* Resets the current {@link AndroidSchedulers} instance.
* This will re-init the cached schedulers on the next usage,
* which can be useful in testing.
*/
@Experimental
public static void reset(a) {
INSTANCE.set(null); }}Copy the code
支那
class LooperScheduler extends Scheduler { private final Handler handler; LooperScheduler(Looper looper) { handler = new Handler(looper); } LooperScheduler(Handler handler) { this.handler = handler; } @Override public Worker createWorker() { return new HandlerWorker(handler); } static class HandlerWorker extends Worker { private final Handler handler; private final RxAndroidSchedulersHook hook; private volatile boolean unsubscribed; HandlerWorker(Handler handler) { this.handler = handler; this.hook = RxAndroidPlugins.getInstance().getSchedulersHook(); } @Override public void unsubscribe() { unsubscribed = true; handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isUnsubscribed() { return unsubscribed; } @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (unsubscribed) { return Subscriptions.unsubscribed(); } action = hook.onSchedule(action); ScheduledAction scheduledAction = new ScheduledAction(action, handler); Message message = Message.obtain(handler, scheduledAction); message.obj = this; // Used as token for unsubscription operation. handler.sendMessageDelayed(message, unit.toMillis(delayTime)); if (unsubscribed) { handler.removeCallbacks(scheduledAction); return Subscriptions.unsubscribed(); } return scheduledAction; } @Override public Subscription schedule(final Action0 action) { return schedule(action, 0, TimeUnit.MILLISECONDS); } } static final class ScheduledAction implements Runnable, Subscription { private final Action0 action; private final Handler handler; private volatile boolean unsubscribed; ScheduledAction(Action0 action, Handler handler) { this.action = action; this.handler = handler; } @Override public void run() { try { action.call(); } catch (Throwable e) { // nothing to do but print a System error as this is fatal and there is nowhere else to throw this IllegalStateException ie; if (e instanceof OnErrorNotImplementedException) { ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); } else { ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); } RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } } @Override public void unsubscribe() { unsubscribed = true; handler.removeCallbacks(this); } @Override public boolean isUnsubscribed() { return unsubscribed; }}}Copy the code
4. Source code analysis of RxJava2 Scheduler
public abstract class Scheduler {
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
static {
CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx2.scheduler.drift-tolerance".15));
}
public static long clockDriftTolerance(a) {
return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
}
@NonNull
public abstract io.reactivex.Scheduler.Worker createWorker(a);
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public void start(a) {}public void shutdown(a) {}@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final io.reactivex.Scheduler.Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
io.reactivex.Scheduler.DisposeTask task = new io.reactivex.Scheduler.DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final io.reactivex.Scheduler.Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
io.reactivex.Scheduler.PeriodicDirectTask periodicTask = new io.reactivex.Scheduler.PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
@SuppressWarnings("unchecked")
@NonNull
public <S extends io.reactivex.Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
public abstract static class Worker implements Disposable {
/**
* Schedules a Runnable for execution without any time delay.
*
* <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
*
* @param run
* Runnable to schedule
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
/**
* Schedules an Runnable for execution at some point in the future specified by a time delay
* relative to the current time.
* <p>
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
* as if the {@link #schedule(Runnable)} was called.
*
* @param run
* the Runnable to schedule
* @param delay
* time to "wait" before executing the action; non-positive values indicate an non-delayed
* schedule
* @param unit
* the time unit of {@code delayTime}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new io.reactivex.Scheduler.Worker.PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
/**
* Returns the 'current time' of the Worker in the specified time unit.
* @param unit the time unit
* @return the 'current time'
* @since2.0 * /
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/** * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */
final class PeriodicTask implements Runnable.SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
this.decoratedRun = decoratedRun;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
lastNowNanoseconds = firstNowNanoseconds;
startInNanoseconds = firstStartInNanoseconds;
}
@Override
public void run(a) {
decoratedRun.run();
if(! sd.isDisposed()) {long nextTick;
long nowNanoseconds = now(TimeUnit.NANOSECONDS);
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
/* * Shift the start point back by the drift as if the whole thing * started count periods ago. */
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
} else {
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
}
lastNowNanoseconds = nowNanoseconds;
long delay = nextTick - nowNanoseconds;
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); }}@Override
public Runnable getWrappedRunnable(a) {
return this.decoratedRun; }}}static final class PeriodicDirectTask
implements Disposable.Runnable.SchedulerRunnableIntrospection {
@NonNull
final Runnable run;
@NonNull
final io.reactivex.Scheduler.Worker worker;
volatile boolean disposed;
PeriodicDirectTask(@NonNull Runnable run, @NonNull io.reactivex.Scheduler.Worker worker) {
this.run = run;
this.worker = worker;
}
@Override
public void run(a) {
if(! disposed) {try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throwExceptionHelper.wrapOrThrow(ex); }}}@Override
public void dispose(a) {
disposed = true;
worker.dispose();
}
@Override
public boolean isDisposed(a) {
return disposed;
}
@Override
public Runnable getWrappedRunnable(a) {
returnrun; }}static final class DisposeTask implements Disposable.Runnable.SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final io.reactivex.Scheduler.Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull io.reactivex.Scheduler.Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run(a) {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null; }}@Override
public void dispose(a) {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else{ w.dispose(); }}@Override
public boolean isDisposed(a) {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable(a) {
return this.decoratedRun; }}}Copy the code
- Different schedulers are passed in to use different threads
- Scheduler creates workers to use a real thread pool
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0.null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
/**
* Schedules the given runnable on the underlying executor directly and
* returns its future wrapped into a Disposable.
* @param run the Runnable to execute in a delayed fashion
* @param delayTime the delay amount
* @param unit the delay time unit
* @return the ScheduledRunnable instance
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try{ Future<? > f;if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
returnEmptyDisposable.INSTANCE; }}/**
* Schedules the given runnable periodically on the underlying executor directly
* and returns its future wrapped into a Disposable.
* @param run the Runnable to execute in a periodic fashion
* @param initialDelay the initial delay amount
* @param period the repeat period amount
* @param unit the time unit for both the initialDelay and period
* @return the ScheduledRunnable instance
*/
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {
InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
try{ Future<? > f;if (initialDelay <= 0L) {
f = executor.submit(periodicWrapper);
} else {
f = executor.schedule(periodicWrapper, initialDelay, unit);
}
periodicWrapper.setFirst(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try{ Future<? > f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); task.setFuture(f);return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
returnEmptyDisposable.INSTANCE; }}/** * Wraps the given runnable into a ScheduledRunnable and schedules it * on the underlying ScheduledExecutorService. * <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return * false. *@param run the runnable instance
* @param delayTime the time to delay the execution
* @param unit the time unit
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
* @return the ScheduledRunnable instance
*/
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if(parent ! =null) {
if(! parent.add(sr)) {returnsr; } } Future<? > f;try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if(parent ! =null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void dispose(a) {
if(! disposed) { disposed =true; executor.shutdownNow(); }}/** * Shuts down the underlying executor in a non-interrupting fashion. */
public void shutdown(a) {
if(! disposed) { disposed =true; executor.shutdown(); }}@Override
public boolean isDisposed(a) {
returndisposed; }}Copy the code
- Pass in the concrete operation Runnable
- Schedule is implemented through the schedule method
RxJava2: Scheduler in RxAndroid
Similar to RxJava1, Handler and Looper are used to implement execution on the main thread
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call(a) throws Exception {
returnMainHolder.DEFAULT; }});/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread(a) {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers(a) {
throw new AssertionError("No instances."); }}Copy the code
支那
final class HandlerScheduler extends Scheduler { private final Handler handler; HandlerScheduler(Handler handler) { this.handler = handler; } @Override public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } @Override public void dispose() { disposed = true; handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isDisposed() { return disposed; } } private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } } @Override public void dispose() { disposed = true; handler.removeCallbacks(this); } @Override public boolean isDisposed() { return disposed; }}}Copy the code
5.RxJava1 Scheduler threads transform and copy
Main objects:
Switcher Thread Switcher
- For thread switching
- There is a createWorker method
/** * Created by Xionghu on 2018/6/14. * Desc: Created by Xionghu on 2018/6/14
public abstract class Switcher {
public abstract Worker createWorker(a);
public static abstract class Worker implements Calling{
public abstract Calling switches(Action0 action0); }}Copy the code
Worker
- Classes that actually perform thread transformations
- Perform transformations through the switches method
NewThreadSwitcher
- Switcher to switch to a new thread
- Implement the createWorker method
/** * Created by Xionghu on 2018/6/14. * Desc: Switcher for new threads */
public class NewThreadSwitcher extends Switcher {
@Override
public Worker createWorker(a) {
return newNewThreadWorker(); }}Copy the code
NewThreadWorker
- There is a thread pool with only one thread
- Implement the switches method to switch threads
- Throw the actual operation into the thread pool with a Runnable wrapper
/** * Created by Xionghu on 2018/6/14. * Desc: new thread work class */
public class NewThreadWorker extends Switcher.Worker {
//newScheduledThreadPool: creates a thread pool of unlimited size. This thread pool supports the need to execute tasks regularly and periodically.
private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable, " NewThreadWorker"); }});private volatile boolean mIsUnCalled;
@Override
public void unCall(a) {
mIsUnCalled = true;
}
@Override
public boolean isUnCalled(a) {
return mIsUnCalled;
}
@Override
public Calling switches(Action0 action0) {
SwitcherAction switcherAction = new SwitcherAction(action0);
mExecutor.submit(switcherAction);
return switcherAction;
}
private static class SwitcherAction implements Runnable.Calling {
private final Action0 action0;
private volatile boolean mIsUnCalled;
public SwitcherAction(Action0 action0) {
this.action0 = action0;
}
@Override
public void unCall(a) {
mIsUnCalled = true;
}
@Override
public boolean isUnCalled(a) {
return mIsUnCalled;
}
@Override
public void run(a) { action0.call(); }}}Copy the code
LooperSwitcher
Switch to a thread's Looper in Android/** * Created by Xionghu on 2018/6/14. * Desc: Looper Switcher for Android */
public class LooperSwitcher extends Switcher {
private Handler mHandler;
public LooperSwitcher(Looper looper) {
mHandler = new Handler(looper);
}
@Override
public Worker createWorker(a) {
return newHandlerWorker(mHandler); }}Copy the code
HandlerWorker
Sends the action to the specified Looper for execution
支那
import android.os.Handler; import android.os.Message; /** * Created by Xionghu on 2018/6/14. * Desc: Worker */ public class HandlerWorker extends Switcher.Worker {private final Handler mHandler; private volatile boolean mIsUnCalled; public HandlerWorker(Handler mHandler) { this.mHandler = mHandler; } @Override public void unCall() { mIsUnCalled = true; mHandler.removeCallbacksAndMessages(this); } @Override public boolean isUnCalled() { return mIsUnCalled; } @Override public Calling switches(Action0 action0) { SwitcherAction switcherAction = new SwitcherAction(action0, mHandler); Message message = Message.obtain(mHandler, switcherAction); message.obj = this; mHandler.sendMessage(message); return switcherAction; } private static class SwitcherAction implements Runnable, Calling { private final Action0 action0; private final Handler handler; private volatile boolean mIsUnCalled; public SwitcherAction(Action0 action0, Handler handler) { this.action0 = action0; this.handler = handler; } @Override public void unCall() { mIsUnCalled = true; handler.removeCallbacks(this); } @Override public boolean isUnCalled() { return mIsUnCalled; } @Override public void run() { action0.call(); }}}Copy the code
6. The RxJava2 Scheduler threads change to copy data
Switcher Thread Switcher
- For thread switching
- There is a createWorker method
- Contains a switches method itself (unlike RxJava1)
/** * Created by Xionghu on 2018/6/14. * Desc: Abstract class for thread switching */
public abstract class Switcher {
public abstract Worker createWorker(a);
public Release switches(final Runnable runnable) {
Worker worker = createWorker();
worker.switches(new Runnable() {
@Override
public void run(a) { runnable.run(); }});return worker;
}
public static abstract class Worker implements Release {
public abstract Release switches(Runnable runnable); }}Copy the code
Worker
- Classes that actually perform thread transformations
- Perform transformations through the switches method
NewThreadSwitcher
- Switcher to switch to a new thread
- Implement the createWorker method
/** * Created by Xionghu on 2018/6/14. * Desc: Switcher for new threads */
public class NewThreadSwitcher extends Switcher {
@Override
public Worker createWorker(a) {
return newNewThreadWorker(); }}Copy the code
NewThreadWorker
- There is a thread pool with only one thread
- Implement the switches method to switch threads
- Throw the actual operation into the thread pool with a Runnable wrapper
import android.support.annotation.NonNull;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** * Created by Xionghu on 2018/6/14. * Desc: new thread work class */
public class NewThreadWorker extends Switcher.Worker {
private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable, "NewThreadWorker"); }});private volatile boolean mIsReleased;
@Override
public boolean isReleased(a) {
return mIsReleased;
}
@Override
public void release(a) {
mIsReleased = true;
}
@Override
public Release switches(Runnable runnable) {
SwitcherAction switcherAction = new SwitcherAction(runnable);
mExecutor.submit((Callable<Object>) switcherAction);
return switcherAction;
}
private static class SwitcherAction implements Runnable.Callable<Object>, Release {
private final Runnable mRunnable;
private volatile boolean mIsReleased;
public SwitcherAction(Runnable mRunnable) {
this.mRunnable = mRunnable;
}
@Override
public boolean isReleased(a) {
return mIsReleased;
}
@Override
public void release(a) {
mIsReleased = true;
}
@Override
public void run(a) {
mRunnable.run();
}
@Override
public Object call(a) throws Exception {
run();
return null; }}}Copy the code
LooperSwitcher
Switch to a thread’s Looper in Android
HandlerWorker
Sends the action to the specified Looper for execution
7. Principle analysis of RxJava1 subscribeOn
- We do it with OnSubscribe
- Scheduler is used to place emitted actions into a thread for execution
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
Copy the code
支那
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; final boolean requestOn; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; this.requestOn = requestOn; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source); subscriber.add(parent); subscriber.add(inner); inner.schedule(parent); } static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> actual; final boolean requestOn; final Worker worker; Observable<T> source; Thread t; SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) { this.actual = actual; this.requestOn = requestOn; this.worker = worker; this.source = source; } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable e) { try { actual.onError(e); } finally { worker.unsubscribe(); } } @Override public void onCompleted() { try { actual.onCompleted(); } finally { worker.unsubscribe(); } } @Override public void call() { Observable<T> src = source; source = null; t = Thread.currentThread(); src.unsafeSubscribe(this); } @Override public void setProducer(final Producer p) { actual.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread() || ! requestOn) { p.request(n); } else { worker.schedule(new Action0() { @Override public void call() { p.request(n); }}); }}}); }}Copy the code
A proxy mechanism is used
8. Analysis of RxJava2 subscribeOn principle
8.1.RxJava2(without back pressure) subscribeOn
支那
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }Copy the code
支那
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } } final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); }}}Copy the code
- Inheritance AbstractObservableWithUpstream
- Implement the subscribeActual method
- Scheduler is used to place the send action into the thread for execution
8.2.RxJava2(with back pressure) subscribeOn
支那
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
Copy the code
支那
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> { final Scheduler scheduler; final boolean nonScheduledRequests; public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) { super(source); this.scheduler = scheduler; this.nonScheduledRequests = nonScheduledRequests; } @Override public void subscribeActual(final Subscriber<? super T> s) { Scheduler.Worker w = scheduler.createWorker(); final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests); s.onSubscribe(sos); w.schedule(sos); } static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable { private static final long serialVersionUID = 8094547886072529208L; final Subscriber<? super T> actual; final Scheduler.Worker worker; final AtomicReference<Subscription> s; final AtomicLong requested; final boolean nonScheduledRequests; Publisher<T> source; SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) { this.actual = actual; this.worker = worker; this.source = source; this.s = new AtomicReference<Subscription>(); this.requested = new AtomicLong(); this.nonScheduledRequests = ! requestOn; } @Override public void run() { lazySet(Thread.currentThread()); Publisher<T> src = source; source = null; src.subscribe(this); } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this.s, s)) { long r = requested.getAndSet(0L); if (r ! = 0L) { requestUpstream(r, s); } } } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); worker.dispose(); } @Override public void onComplete() { actual.onComplete(); worker.dispose(); } @Override public void request(final long n) { if (SubscriptionHelper.validate(n)) { Subscription s = this.s.get(); if (s ! = null) { requestUpstream(n, s); } else { BackpressureHelper.add(requested, n); s = this.s.get(); if (s ! = null) { long r = requested.getAndSet(0L); if (r ! = 0L) { requestUpstream(r, s); } } } } } void requestUpstream(final long n, final Subscription s) { if (nonScheduledRequests || Thread.currentThread() == get()) { s.request(n); } else { worker.schedule(new Request(s, n)); } } @Override public void cancel() { SubscriptionHelper.cancel(s); worker.dispose(); } static final class Request implements Runnable { private final Subscription s; private final long n; Request(Subscription s, long n) { this.s = s; this.n = n; } @Override public void run() { s.request(n); }}}}Copy the code
- Inheritance AbstractFlowableWithUpstream
- Implement the subscribeActual method
- Scheduler is used to place emitted actions into a thread for execution
9. RxJava1 subscribeOn copy
支那
public final Caller<T> callOn(Switcher switcher) {
return create(new OperatorCallOn<>(switcher, this));
}
Copy the code
支那
/** * Created by Xionghu on 2018/6/14. * Desc: OnCall */ Public Class OperatorCallOn<T> implements caller. OnCall<T> {private Final Switcher Switcher; private final Caller<T> tCaller; public OperatorCallOn(Switcher switcher, Caller<T> tCaller) { this.switcher = switcher; this.tCaller = tCaller; } @Override public void call(final Receiver<T> tReceiver) { Switcher.Worker worker = switcher.createWorker(); worker.switches(new Action0() { @Override public void call() { Receiver<T> tReceiver1 = new Receiver<T>() { @Override public void onCompleted() { tReceiver.onCompleted(); } @Override public void onError(Throwable t) { tReceiver.onError(t); } @Override public void onReceive(T t) { tReceiver.onReceive(t); }}; tCaller.call(tReceiver1); }}); }}Copy the code
OnCall for callOn
- Has the original Caller and Switcher
- Create a new Receiver to wrap the old one into the thread
run
支那
import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import com.haocai.mylibrary.rxJava1.Caller; import com.haocai.mylibrary.rxJava1.NewThreadSwitcher; import com.haocai.mylibrary.rxJava1.Receiver; import com.haocai.rxjavademo.R; import butterknife.ButterKnife; import butterknife.OnClick; /** * Created by Xionghu on 2018/6/11. * Desc: .rxJavA1 subscribeOn copy */ public class Lesson3_2Activity extends AppCompatActivity {public static final String TAG = "kpioneer"; @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Caller. create(new Caller.OnCall<String>() { @Override public void call(Receiver<String> stringReceiver) { if (! stringReceiver.isUnCalled()) { stringReceiver.onReceive("test"); stringReceiver.onCompleted(); } } }). callOn(new NewThreadSwitcher()). call(new Receiver<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable t) { } @Override public void onReceive(String o) { Log.d(TAG, "onReceive:" + o); Log.d(TAG, "currentThread:" + Thread.currentThread()); }}); }}Copy the code
支那
06-15 14:19:02. 219, 17153-17366 / com. Haocai. Rxjavademo D/kpioneer: OnReceive: 06-15 14:19:02 test. 219, 17153-17366 / com. Haocai. Rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]Copy the code
10. RxJava2 subscribeOn copy
10.1RxJava2(Without back Pressure)
支那
public Caller<T> callOn(Switcher switcher) {
return new CallerCallOn<>(this, switcher);
}
Copy the code
支那
/** * Created by Xionghu on 2018/6/15. * Desc: Public class extends CallerWithUpstream<T, T> {private Switcher mSwitcher; public CallerCallOn(Caller<T> source, Switcher mSwitcher) { super(source); this.mSwitcher = mSwitcher; } @Override protected void callActual(Callee<T> callee) { final CallOnCallee<T> tCallOnCallee = new CallOnCallee<>(callee); callee.onCall(tCallOnCallee); mSwitcher.switches(new Runnable() { @Override public void run() { source.call(tCallOnCallee); }}); } private static final class CallOnCallee<T> implements Callee<T>, Release { private final Callee<T> callee; public CallOnCallee(Callee<T> callee) { this.callee = callee; } @Override public void onCall(Release release) { } @Override public void onReceive(T t) { callee.onReceive(t); } @Override public void onCompleted() { callee.onCompleted(); } @Override public void onError(Throwable t) { callee.onError(t); } @Override public boolean isReleased() { return false; } @Override public void release() { } } }Copy the code
CallerCallOn
- Inherited from CallerWithUpstream
- Has the original Caller and Switcher
- Create a new Callee wrapped around the old one and drop it into the thread
10.2RxJava2(with back pressure)
支那
public Telephoner<T> callOn(Switcher switcher) {
return new TelephonerCallOn<>(this, switcher);
}
Copy the code
支那
import com.haocai.mylibrary.rxJava2.Switcher; import java.util.concurrent.atomic.AtomicLong; /** * Created by Xionghu on 2018/6/15. * Desc: extends public class TelephonerCallOn<T TelephonerWithUpstream<T, T> { private final Switcher mSwitcher; public TelephonerCallOn(Telephoner<T> source, Switcher switcher) { super(source); mSwitcher = switcher; } @Override protected void callActual(Receiver<T> receiver) { final CallOnReceiver<T> tCallOnReceiver = new CallOnReceiver<>(receiver); receiver.onCall(tCallOnReceiver); mSwitcher.switches(new Runnable() { @Override public void run() { source.call(tCallOnReceiver); }}); } private static final class CallOnReceiver<T> extends AtomicLong implements Receiver<T>, Drop { private final Receiver<T> mReceiver; public CallOnReceiver(Receiver<T> receiver) { mReceiver = receiver; } @Override public void request(long n) { BackpressureHelper.add(this, n); } @Override public void drop() { } @Override public void onCall(Drop d) { mReceiver.onCall(d); } @Override public void onReceive(T t) { if (get() ! = 0) { mReceiver.onReceive(t); BackpressureHelper.produced(this, 1); } } @Override public void onError(Throwable t) { mReceiver.onError(t); } @Override public void onCompleted() { mReceiver.onCompleted(); }}}Copy the code
TelephonerCallOn
- Inherited from TelephonerWithUpstream
- Hold the original Telephoner and Switcher
- Create a new Receiver to wrap the old one into the thread
10.3 run
支那
/** * Created by Xionghu on 2018/6/11. * Desc: .rxJavA2 subscribeOn copy */ public Class Lesson3_3Activity extends AppCompatActivity {public static final String TAG = "kpioneer"; @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @ OnClick (R.i which estDo) public void onViewClicked () {/ * -- -- -- -- -- -- -- -- -- no back pressure -- -- -- -- -- -- -- -- - * / Caller. Create (new CallerOnCall<String>() { @Override public void call(CallerEmitter<String> callerEmitter) { callerEmitter.onReceive("test"); callerEmitter.onCompleted(); } }). callOn(new NewThreadSwitcher()). call(new Callee<String>() { @Override public void onCall(Release release) { } @override public void onReceive(String String) {log. d(TAG, "no back pressure: onReceive:" + String); Log.d(TAG, "no back pressure: currentThread:" + thread.currentThread ()); } @Override public void onCompleted() { } @Override public void onError(Throwable t) { } }); / * -- -- -- -- -- -- -- -- -- a back pressure -- -- -- -- -- -- -- -- - * / Telephoner. Create (new TelephonerOnCall < String > () {@ Override public void call(TelephonerEmitter<String> telephonerEmitter) { telephonerEmitter.onReceive("test"); telephonerEmitter.onCompleted(); } }). callOn(new NewThreadSwitcher()). call(new Receiver<String>() { @Override public void onCall(Drop d) { d.request(Long.MAX_VALUE); } @override public void onReceive(String s) {log. d(TAG, "backpressure: onReceive:" + s); Log.d(TAG, "backpressure: currentThread:" + thread.currentThread ()); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { } }); }}Copy the code
06-15 16:13:27. 002, 813-1150 / com. Haocai. Rxjavademo D/kpioneer: No back pressure: onReceive: test of 06-15 16:13:27. 003, 813-1150 / com. Haocai. Rxjavademo D/kpioneer: No back pressure: currentThread: Thread [NewThreadWorker, 5, the main] 6-15 16:13:27. 011, 813-1151 / com. Haocai. Rxjavademo D/kpioneer: A back pressure: onReceive: test of 06-15 16:13:27. 011, 813-1151 / com. Haocai. Rxjavademo D/kpioneer: CurrentThread :Thread[NewThreadWorker,5,main]Copy the code
11 Mechanism Analysis of RxJava1 observeOn
支那
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
Copy the code
支那
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
public static <T> Operator<T, T> rebatch(final int n) {
return new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
parent.init();
return parent;
}
};
}
/** Observe through individual queue per observer. */
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final boolean delayError;
final Queue<Object> queue;
/** The emission threshold that should trigger a replenishing request. */
final int limit;
// the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
/** Remembers how many elements have been emitted before the requests run out. */
long emitted;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(NotificationLite.<T>getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}
}
return false;
}
}
}
Copy the code
Operator for observeOn
- Is the Operator observeOn
- Transform the Operator with lift
- Return a Subscriber for observeOn in Operator
Subscriber for observeOn
Throw it into a thread when a method such as onNext is called