Example:
val source = object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(1)
emitter.onNext(2)
emitter.onComplete()
}
}
val observer = object : Observer<Int> {
override fun onComplete() {}
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Int) {}
override fun onError(e: Throwable) {}
}
Observable.create(source).subscribe(observer)
Copy the code
Analysis of the
Entrance observables. Create (source)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source."source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
Pass an ObservableOnSubscribe interface object with only one method in it
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
Copy the code
Then look at the onAssembly method in RxJavaPlugins
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if(f ! = null) {return apply(f, source);
}
return source;
}
Copy the code
##onObservableAssembly is a static variable and is empty unless it is set actively, so create returns the new ObservableCreate(Source) directly. This generates an ObservableCreate object, passes in the ObservableOnSubscribe interface object, and then calls (ObservableCreate object).subscribe(observer).
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); try { ... subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { ... }}Copy the code
This step triggers the execution of the subscribeActual method within the ObservableCreate object. Next into the ObservableCreate class:
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); Createcreateemitter (Disposable) {createCreateEmitter (Disposable); createCreateEmitter (Disposable);if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if(! isDisposed()) { observer.onNext(t); } } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { ... }Copy the code
CreateEmitter is generated and an Observer is passed in, and observer.onsubscribe (parent) is called. We’re passing a Disposable interface object; Subscribe (parent) calls back the SUBSCRIBE method in the ObservableOnSubscribe interface object and passes perant (CreateEmitter) into it. OnNext (1) starts emitting data from CreateEmitter. OnNext (1).
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
Copy the code
Observer.onnext (t); This is the observer’s onNext method, so the whole subscription process is connected.
Process review
Observable.create(source). Subscribe (observer) is a two-step process
Observable. Create (source) generates an ObservableCreate object and associates the ObservableCreate object with the ObservableOnSubscribe interface object
The second step: (ObservableCreate object). Subscribe (Observer) is the time when the subscription is actually triggered. It triggers the subscribeActual method in the ObservableCreate object, and CreateEmitter object is generated. Then execute the SUBSCRIBE method in the ObservableOnSubscribe interface object, pass in the CreateEmitter object, and start emitting data Emitters. OnNext (1). The CreateEmitter object onNext method eventually calls back to the Observer onNext method, at which point the entire subscription process is formed.