What is a RxJava

RxJava is an implementation of ReactiveX on the JVM that uses observable sequences to write asynchronous and event-based programs. It extends the observer pattern to support data/event sequences and adds operators that allow you to combine sequences declaratively, while abstracting concerns about low-level threading, synchronization, thread safety and concurrent data structures.

What is the observer model

The observer pattern, also known as the Publish/Subscribe pattern, is one of the behavioral patterns. The observer pattern defines a one-to-many dependency where a topic object can be listened on by multiple observer objects simultaneously. When the state of the topic object changes, all observer objects are notified and the corresponding processing logic is applied.

Observer mode role

  • Abstract Subject: An abstract topic that stores all references to observer objects in a collection of any number of observers. An abstract topic provides an interface to add, delete, and notify observer objects.
  • Abstract Observer: Abstract Observer, an abstract class for the Observer, defines an update interface that allows it to update itself when notified of a topic change.
  • Concrete Subject: Store the relevant state into a Concrete observer object, and send notifications to all registered observers when the internal state of a specific topic changes.
  • Concrere Observer: Implements an update interface defined by an abstract Observer to update its own state when notified of a topic change.

Realization of observer mode (taking Weibo fans following celebrities as an example)

1. Create abstract observed (Subject) :

Public interface Star {** * addFan */ void addFan(Fan Fan); /** * removeFan(Fan Fan); /** * share dynamic */ void notifyFan(String message); }Copy the code

2. Create an abstract Observer

Public interface Fan {/** * update(String message); }Copy the code

3. Create Concrete Subject stars

public class AStar implements Star{

    private List<Fan> fanList = null;
    
    public AStar(){
        fanList = new ArrayList<Fan>();
    }
    
    @Override
    public void addFan(Fan fan){
        fanList.add(fan);
    }
    
    @Override
    public void removeFan(Fan fan){
        fanList.remove(fan);
    }
    
    @Override
    public void notifyFan(String message){
        for(Fan fan : fanList){
            fan.update("AStar has released ** information"); }}}Copy the code

4. Create concrete observers (Concrere Observer concrete fans)

public class AFan implements Fan{
    
    private String fanName;
    
    public AFan(String fanName){
        this.fanName = fanName;
    }
    
    @Override
    public void update(String message){
        Log.d("AFan has received a message from AStar."); }}Copy the code

RxJava Observer mode

RxJava has three basic elements

Observable, Observer, subscribe.

Abstract Observed in Rxjava (Abstract Subject)

Observable is an abstract class that implements the ObservableSource abstract interface.

public abstract class Observable<T> implements ObservableSource<T> {
......
}
Copy the code

The subscribe() in ObservableSource is used to subscribe to the observer, so the ObservableSource abstracts the observed.

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}
Copy the code

Abstract Observer in RxJava

The subscribe() method of the ObservableSource identifies the abstract Observer as the argument object Observer.

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
Copy the code

Concrete Subject in Rxjava

Observable Observable = ObservableOnSubscribe (new ObservableOnSubscribe<String>() {Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); }}); /** * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world. * @param <T> the elementtype
 * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
 * @return the new Observable instance
 * @see ObservableOnSubscribe
 * @see ObservableEmitter
 * @see Cancellable
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source."source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code

The ObservableCreate is the specific observed by the create method.

Concrere Observer in RxJava

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {}};Copy the code

The observer that implements the Observer interface is a concrete observer.

Rxjava subscription implementation

observable.subscribe(observer);
Copy the code
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code