First, handwritten RxJava

1. Chain logic is simple

RxJava creates like an astronaut moonshot

1. The astronaut enters the rocket to count down, OnSubscribe: the successful subscription will start to send 2. The astronauts countdown to the launch and subscribe. 4. The astronaut enters space orbit, onComplete: the receiving event is completeCopy the code

Code example (RxJava usage example):

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(Observer<Integer> observableEmitter) { Log.i(TAG, "2. The astronauts subscribe to the countdown to launch. observableEmitter.onNext(123); observableEmitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe() { Log.i(TAG, "1. The astronaut enters the rocket to count down, OnSubscribe: the successful subscription will start sending "); } @Override public void onNext(Integer item) { Log.i(TAG, "3. Astronaut rocket separation, onNext: receive "); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "4. Astronauts enter orbit and land on the moon, onComplete: receive event completed "); }});Copy the code

2. Implementation principle of RxJava

Define, subscribe, launch

ObservableOnSubscribe Source Returns an Observable

Subscribe: subscribe after the call to start the subscription,

Launch: The Subscribe Observer of ObservableOnSubscribe launches.


Here’s how to implement the three operators: Create, Just, and Map.

2.1 the Create


How do you implement this perfect chain-structure create implementation?

It gives all control to the object Observable.


Observable 代码实现:

public class Observable<T> { ObservableOnSubscribe source; private Observable(ObservableOnSubscribe<T> source) { this.source = source; } public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new Observable<T>(source) ; } public void subscribe(Observer<T> observer) { observer.onSubscribe(); source.subscribe( observer); }}Copy the code

Note that generics are used to increase Observable extensibility


2.2 just realize

The just operator is embedded with transmitters.

Example:

Observable.just("abc").subscribe(new Observer<String>() { @Override public void onSubscribe() { Log.i(TAG, "onSubscribe "); } @Override public void onNext(String item) { Log.i(TAG, "onNext "+item); } @Override public void onError(Throwable e) { Log.i(TAG, "onError "); } @Override public void onComplete() { Log.i(TAG, "onComplete "); }});Copy the code

We can see that the Subscribe Observer that launches ObservableOnSubscribe is missing because the just method is used to define the launch.

Examples of handwritten code:

public static <T> Observable<T> just(final T... t) { ObservableOnSubscribe onSubscribe= new ObservableOnSubscribe<T>() { @Override public void subscribe(Observer<T> observableEmitter) { for (T t1:t) { observableEmitter.onNext(t1); } observableEmitter.onComplete(); }}; return new Observable<T>(onSubscribe) ; }Copy the code

2.3 map implementation

Implementation principle:

The map operator is all about transformation, that is, how to pass in Function apply, transform the transmitted content and call onNext to receive it.

Solution: Introduce ObservableMap and MapObserver to take over control


Map use sample code:

Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(Observer<String> ObservableEmitter) {log. I (TAG, "The astronaut ends the countdown to launch, subscribe: launch "); observableEmitter.onNext("123123213"); observableEmitter.onComplete(); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Throwable { Log.i(TAG, "apply"); int a = s.length(); Object object= new Object() ; return a; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe() { Log.i(TAG, "onSubscribe"); } @Override public void onNext(Integer item) { Log.i(TAG, "onNext "+item); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i(TAG, "onComplete"); }});Copy the code

Handwritten example code: map function definition

/ /? Super writable |? Extends public <R> Observable<R> map(Function<? super T, ? extends R> function){ // ObservableMap map = new ObservableMap(source,function); ObservableMap<T, R> observableMap = new ObservableMap(source, function); ObservableMap (observableMap); // observableMap (observableMap); ObservableMap (observableMap); observableMap (observableMap); }Copy the code

Handwritten example code: ObservableMap and MapObserver definitions

public class ObservableMap<T,R> implements ObservableOnSubscribe<R> { ObservableOnSubscribe<T> source; Function<T,R> function; Observer<R> observableEmitter; public ObservableMap(ObservableOnSubscribe source, Function<T,R> function) { this.source = source; this.function = function; } @Override public void subscribe(Observer<R> observableEmitter) { this.observableEmitter = observableEmitter; // source.subscribe(observableEmitter); // The next layer Observer should not be handed over. MapObserver<T> MapObserver = new observableEmitter (ObservableSource, function); source.subscribe(mapObserver); } class MapObserver<T> implements Observer<T>{ObservableOnSubscribe source; Function<T,R> function; Observer<R> observableEmitter; public MapObserver(Observer<R> observableEmitter, ObservableOnSubscribe source, Function<T, R> function) { this.observableEmitter = observableEmitter; this.function =function; this.source=source; } @Override public void onSubscribe() { observableEmitter.onSubscribe(); } @Override public void onNext(T item) { try { R r= function.apply(item); observableEmitter.onNext(r); } catch (Throwable throwable) { throwable.printStackTrace(); observableEmitter.onError(throwable); } } @Override public void onError(Throwable e) { observableEmitter.onError(e); } @Override public void onComplete() { observableEmitter.onComplete(); }}}Copy the code

The related generic wildcard