An Observable can send multiple events, or it can send onError, which calls onComplete
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NotNull ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NotNull Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(@NotNull Integer integer) { System.out.println("onNext: " + integer); } @Override public void onError(@NotNull Throwable e) { System.out.println("onError: " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); }});Copy the code
Look at the observables
public abstract class Observable<T> implements ObservableSource<T>
Copy the code
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
Copy the code
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
Copy the code
You can see the Observer methods that the Observable is interested in.
Call Create to create an ObservableCreate object.
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
...
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Perform to the source. The subscribe (parent); And then we came to ours
public void subscribe(@NotNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
Copy the code
The Observer of CreateEmitter calls onNext
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
Copy the code
Once onComplete is called, the event is disposed. The stream of events ends.
public void onComplete() { if (! isDisposed()) { try { observer.onComplete(); } finally { dispose(); }}}Copy the code
Based on the source 2.2.4 version