RxJava can be thought of as a concrete implementation of the Observer pattern. Since it is the realization of the Observer pattern, it has observers, observables and subscribes. Let’s look at some of these implementations. The version of RxJava referenced here is as follows
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
implementation "IO. Reactivex. Rxjava2: rxjava: 2.2.9." "
Copy the code
The first step is to create an Observer, which processes the specific logic after an event is generated. There are two main methods: Observer and Subscriber Observer: Create an Observer by directly new an Observer
/** * Private observer <String>createObserverByObserver() {
returnNew Observer<String>() {@override public void subscribe (Disposable d) { Here we can do some initialization work // we can call d.dispose(); To cancel the event observation log.e ("TAG"."onSubscribe ByObserver");
}
@Override
public void onNext(String o) {
Log.e("TAG"."onNext ByObserver:" + o);
}
@Override
public void onError(Throwable e) {
Log.e("TAG"."onError ByObserver");
}
@Override
public void onComplete() {
Log.e("TAG"."onComplete ByObserver"); }}; }Copy the code
Here, onSubscribe is equivalent to start in RxJava1, and the subscription can be cancelled by D, and the observer can also be created by Subscriber
/** * Create observer by subscriber */ private subscriber <String>createObserverBySubscriber() {
returnNew Subscriber<String>() {@override public void onSubscribe(Subscription s) {// onNext will not be called if request is not called // The value is usually passed in integer. MAX, because if it is an array, events larger than n will not be processed, and the onComplete method will not fire s.complete (4); Log.e("TAG"."onSubscribe BySubscriber");
}
@Override
public void onNext(String string) {
Log.e("TAG"."onNext BySubscriber:" + string);
}
@Override
public void onError(Throwable t) {
Log.e("TAG"."onError BySubscriber");
}
@Override
public void onComplete() {
Log.e("TAG"."onComplete BySubscriber"); }}; }Copy the code
This differs from the Observer approach in that it calls the Request method manually, passing in the number of times it needs to be executed. You can also cancel this subscription by calling S.canel (). However, when the way of.subscribe connects the Observer with the observed, the Observer created by the two methods is different from the Observer created by the Observer. It’s observed by the Observer. The create/Observer. Just/Observer. FromArray methods such as creation
/ / Private Observable<String>createObservableByCreate() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("The test 1 bycreate");
emitter.onNext("The test 2 bycreate"); emitter.onComplete(); }}); } private Observable<String>createObservableByJust() {
return Observable.just("The test 1 byjust"."The test 2 byjust"); } /** * Private Observable<String>createObservableByFromArray() {
String[] tests = {"The test 1 fromarray"."The test 2 fromarray"};
return Observable.fromArray(tests);
}
Copy the code
However, the observer created by Subscriber can only inherit or realize Publisher through Flowable and so on. Next, we will focus on Subscriber
Flowable.fromArray(1, 2, 3, 4, 5, 6).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(createObserverBySubscriber());
Copy the code
The results are as follows
It can be seen that we set the number of times to be 4 in the request method, but we have 6 observables, so we only execute onNext, not onComplete. If we change the above to (1,2,3), the result is
Now that simple subscriptions have been introduced, there are still operators that are useful in RxJava that can greatly simplify the processing logic of your code. Create operators like create/just/fromArray, interval (an Observable that emits integer sequences at fixed intervals) are timers, and range (which sends a range of data) are alternatives to for loops. Timer (creates a delayed-start observed). There are also transformation operators. For example, common maps (convert the original object into a new object), flatMap (convert the original object into an Observable), groupBy (group), buffer (convert the original Observable into a new Observable, The new Observable emits an array at a time. Filter operators, such as filter (filter by a certain condition), distinct (remove weight), SKIP (skip), and take(take only the first few). Combined operators, startWith (emits data before the source Observable emits, which is specified in with), Merge (emits multiple Observables into one Observable, He may interleave the data emitted by the merged Observables), concat (he also merges multiple Observables, but he will reflect the data in strict order. If the first One fails to transmit, the next one will definitely not transmit data), Zip (merges two or more Observables and transforms them according to the specified function, eventually emitting a new value). Auxiliary operators subscribeOn (an Observable thread) and observeOn (an observer thread).
Create and subscribe source code analysis: The create method is used to create Observable and New Observer
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("The test 1 bycreate");
emitter.onNext("The test 2 bycreate");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {}});Copy the code
First, let’s look at the create method
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source."source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
You create an ObservableCreate object and pass the incoming object to the class
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if(! isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) {if(! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) {if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if(! isDisposed()) { try { observer.onError(t); } finally { dispose(); }return true;
}
return false;
}
@Override
public void onComplete() {
if(! isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public voidsetDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString()); }}... }Copy the code
As you can see, so far we’ve just assigned the source in the class, so we’re going to subscribe
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); . }}Copy the code
The most important one is the subscribeActual method, which is found to be the abstract method of the abstract Observable class after clicking on it. So its concrete implementation should be in its implementation class. It can be carefully found that when we call Observable. In fact, ObservableCreate is called internally, and this class inherits Observable, so the Subscribe method of the Observable created by create must call the subscribeActual method in this class internally. Let’s go back and look at this method in the class above. It starts by creating a CreateEmitter object from this observer, which is a static inner class and it implements ObservableEmitter and Disposable. We then call the Observer onSubscribe method, whose argument is strongly converted to its parent, Disposable, and then call the Subscribe method of the source, as mentioned above, which is the method passed in to create, And the subclass CreateEmitter. When we’re actually using it, we need to manually call onNext, onComplete in our Subscribe method, and of course we’re calling CreateEmitter onNext, onComplete, etc. As you can see, onNext, onComplete or onError of the Observer is called, and dispose method is called to remove Observable
RxJava2 conversion process: such as map, which is the same as the above subscription, but before the subscribe added a map
map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "prefix:"+s; }})Copy the code
First let’s take a look at this map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Copy the code
You can see that you’re actually creating an ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode ! = NONE) { downstream.onNext(null);return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
returnt ! = null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),"The mapper function returned a null value.") : null; }}}Copy the code
As you can see from the previous analysis, when you call the Subscribe method, you actually call the subscribeActual method in the newly generated class, which is the ObservableMap method, and you can see that he also calls the observed subscribe method. Similarly, when the observer calls onNext and onComplete in the Subscribe method it calls the corresponding method in the Mapper, As you can see, onNext calls observer onNext and processes the original value using mapper’s apply method. What you can see here is that you recreate an Observer, then use the created observer as an associate, and finally call the associated methods in the original Observer.
Thread switching is often used when we use it, for example
(Observable).subscribeOn(Schedulers.computation()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer)
It can be seen that the thread of Observable is Schedulers.computation, and the thread of observer is the main thread of Android. Let’s take a look at the implementation of the observed thread first
public final class Schedulers { @NonNull static final Scheduler COMPUTATION; static { ... COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); . }... static final class ComputationTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception {returnComputationHolder.DEFAULT; }}... static final class ComputationHolder { static final Scheduler DEFAULT = new ComputationScheduler(); }... }Copy the code
You can see that its thread switch is done specifically by the ComputationScheduler class, which we’ll talk about later, followed by subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
ObservableSubscribeOn ObservableSubscribeOn ObservableSubscribeOn ObservableSubscribeOn Observable and incoming Scheduler
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() { source.subscribe(parent); }}}Copy the code
This class is responsible for the distribution of Observable. You can see subscribeActual and onNext in its subclasses. The logic is the same as before. The scheduler. ScheduleDirect method is called in subscribeActual. The parameter of this method can be seen from the above as a runnable, and its run method executes the subscribed subscribe method. The scheduleDirect method is in the ComputationScheduler class that handles the Observable thread
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.scheduleDirect(run, delay, unit);
}
Copy the code
The scheduleDirect method of the thread pool is executed
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); try { Future<? > f;if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
returnEmptyDisposable.INSTANCE; }}Copy the code
It’s a Thread pool submit or schedule, which switches the Observable to the background thread. In the same way as transformation, we create a new observer in the middle of distribution, process the logic in this new Observer, and then call the related methods of the original Observer. We continue to analyze observeOn below, in the same way we look at defined above AndroidSchedulers. MianThread
public final class AndroidSchedulers {
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
returnMainHolder.DEFAULT; }}); private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()),false); }... }Copy the code
The HandlerScheduler class is responsible for switching threads, and its argument is the handler of the main thread, so it’s safe to assume that thread switching should be handler.sendMessage. It calls the following
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code
Then look at
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else{ Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}... }Copy the code
If you look at the subscribeActual method here, the Scheduler is AndroidScheduler so it calls its createWorker method
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}...
}
Copy the code
The class that actually distributes events above is OnbserverOnOnserver, such as its onNext method
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }Copy the code
The Schedule method is called
void schedule() {
if(getAndIncrement() == 0) { worker.schedule(this); }}Copy the code
This is the method in the HandlerScheduler
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }Copy the code
The handler is used to send messages to the main thread, and the Observer thread switch has been analyzed.