An Observable can send multiple events, or it can send onError, which calls onComplete

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NotNull ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NotNull Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(@NotNull Integer integer) { System.out.println("onNext: " + integer); } @Override public void onError(@NotNull Throwable e) { System.out.println("onError: " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); }});Copy the code

Look at the observables

public abstract class Observable<T> implements ObservableSource<T>
Copy the code
public interface ObservableSource<T> {

    void subscribe(@NonNull Observer<? super T> observer);
}
Copy the code
public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}
Copy the code

You can see the Observer methods that the Observable is interested in.

Call Create to create an ObservableCreate object.

  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
Copy the code
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Perform to the source. The subscribe (parent); And then we came to ours

 public void subscribe(@NotNull ObservableEmitter<Integer> emitter) throws Exception {
     emitter.onNext(1);
     emitter.onNext(2);
     emitter.onComplete();
  }
Copy the code

The Observer of CreateEmitter calls onNext

 public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
Copy the code

Once onComplete is called, the event is disposed. The stream of events ends.

public void onComplete() { if (! isDisposed()) { try { observer.onComplete(); } finally { dispose(); }}}Copy the code

Based on the source 2.2.4 version