1. What is RxJava?

1.1 What is Responsive programming?

Is a programming model based on the concept of asynchronous data flow (asynchronous data flow programming) Data flow -> rivers (observed, filtered, manipulated)

1.2 The design principles of responsive programming are:

Keeping data immutable without sharing blocking is harmful

1.3 Provides a solution in our Java – RxJava? \

RxJava: Reactive Extensions Java was born. There is responsive programming in iOS development in Net.


        // Load the file
// new Thread() {
// @Override
// public void run() {
// super.run();
// for (File folder : folders) {
// File[] files = folder.listFiles();
// for (File file : files) {
// if (file.getName().endsWith(".png")) {
// final Bitmap bitmap = getBitmapFromFile(file);
// // Update the UI thread
// runOnUiThread(new Runnable() {
// @Override
// public void run() {
// imageCollectorView.addImage(bitmap);
/ /}
/ /});
/ /}
/ /}
/ /}
/ /}
// }.start();
Copy the code

RxJava writing


        File[] folders = new File[10];
        Observable.from(folders)
        / / convenience
        .flatMap(new Func1<File, Observable<File>>() {
            @Override
            public Observable<File> call(File file) {
                returnObservable.from(file.listFiles()); }})/ / filter
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                    / / conditions
                return file.getName().endsWith(".png"); }})// Load the image
        .map(new Func1<File, Bitmap>() {
            @Override
            public Bitmap call(File file) {
                return getBitmapFromFile(file);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        / / update the UI
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); }});Copy the code

File array flatMap: the equivalent of manually nesting loop queue data structure you will find the following simple advantages: 1: you don’t need to worry about threads 2: you don’t need to worry about how to update the UI thread, how to call

2.RxJava overall architecture design?


Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable The observer subscribes to Subjects: Observable and Subjects are two "producing" entities, and Observer and Subscrible are two "consuming" entities hot Observables and cold Observables. From the perspective of emission, there are two different Observables: Hot and cold. A "hot" Observable typically starts emitting data as soon as it's created. So all subsequent observers that subscribe to it may start receiving data from somewhere in the middle of the sequence (some data is missed). A "cold" Observable waits until an observer subscribes to it before transmitting data, so the observer can be sure to receive the entire data sequence. Hot and Cold Hot: Active scenario: there is currently only one observer in the container, sent to all observers3Because the hot Observables send messages as soon as they are created, suppose I send a second message now, and then all of a sudden I add an observer, and the second observer doesn't get the message. Cold: Passive scenario: The container currently has only1Since the cold Observables will wait for an observer to subscribe once they are created, I immediately send all messages to that observer (subscriber).Copy the code

3.RxJava Basic API?

First case: How do I create Observables?

Subscribe


    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        / * * See https://github.com/ReactiveX/RxJava/issues/216 for the discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */
        // if not already wrapped
        if(! (subscriberinstanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD}}returnSubscriptions.unsubscribed(); }}public class SafeSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    boolean done;

    public SafeSubscriber(Subscriber<? super T> actual) {
        super(actual);
        this.actual = actual;
    }

    /**
     * Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
     * <p>
     * The {@code Observable} will not call this method if it calls {@link #onError}.
     */
    @Override
    public void onCompleted(a) {
        if(! done) { done =true;
            try {
                actual.onCompleted();
            } catch (Throwable e) {
                // we handle here instead of another method so we don't add stacks to the frame
                // which can prevent it from being able to handle StackOverflow
                Exceptions.throwIfFatal(e);
                RxJavaHooks.onError(e);
                throw new OnCompletedFailedException(e.getMessage(), e);
            } finally { // NOPMD
                try {
                    // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
                    // and we throw an UnsubscribeFailureException.
                    unsubscribe();
                } catch (Throwable e) {
                    RxJavaHooks.onError(e);
                    throw newUnsubscribeFailedException(e.getMessage(), e); }}}}/**
     * Notifies the Subscriber that the {@code Observable} has experienced an error condition.
     * <p>
     * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onCompleted}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    @Override
    public void onError(Throwable e) {
        // we handle here instead of another method so we don't add stacks to the frame
        // which can prevent it from being able to handle StackOverflow
        Exceptions.throwIfFatal(e);
        if(! done) { done =true; _onError(e); }}/**
     * Provides the Subscriber with a new item to observe.
     * <p>
     * The {@code Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    @Override
    public void onNext(T t) {
        try {
            if (!done) {
                actual.onNext(t);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don't add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwOrReport(e, this); }}/**
     * The logic for {@code onError} without the {@code isFinished} check so it can be called from within
     * {@code onCompleted}.
     *
     * @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
     */
    @SuppressWarnings("deprecation")
    protected void _onError(Throwable e) { // NOPMD
        RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
        try {
            actual.onError(e);
        } catch (OnErrorNotImplementedException e2) { // NOPMD
            / * * onError isn 't implemented so throw https://github.com/ReactiveX/RxJava/issues/198 * * * * Rx Design Guidelines 5.2 * * "when calling the Subscribe method that only has an onNext argument, the OnError behavior * will be to rethrow the exception on the thread that the message comes out from the observable * sequence. The OnCompleted behavior in this case is to do nothing." */
            try {
                unsubscribe();
            } catch (Throwable unsubscribeException) {
                RxJavaHooks.onError(unsubscribeException);
                throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.".new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD
            }
            throw e2;
        } catch (Throwable e2) {
            /* * throw since the Rx contract is broken if onError failed * * https://github.com/ReactiveX/RxJava/issues/198 */
            RxJavaHooks.onError(e2);
            try {
                unsubscribe();
            } catch (Throwable unsubscribeException) {
                RxJavaHooks.onError(unsubscribeException);
                throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.".new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
            }

            throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError".new CompositeException(Arrays.asList(e, e2)));
        }
        // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
        try {
            unsubscribe();
        } catch (Throwable unsubscribeException) {
            RxJavaHooks.onError(unsubscribeException);
            throw newOnErrorFailedException(unsubscribeException); }}/**
     * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
     *
     * @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
     */
    public Subscriber<? super T> getActual() {
        returnactual; }}Copy the code

Subscriber is actually an Observer

RxJava basically uses source code analysis

Analysis of Observable creation principle: Step 1: call observable. create() method step 2: Add observer subscription to listen to Observable.OnSubscrible Step 3: Create Observable new Observable (hook. OnCreate (f)) in Observable. Step 4: Store the observer subscription listener in the Observable constructor

Subscribe (New Observer()); subscribe(New Observer())

public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(newObserverSubscriber<T>(observer)); } method to register an observerCopy the code

Step 3: Call it in the Observable class

public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
Copy the code

Observable.subscribe(subscriber, this); Step 5: In observables. The subscribe method calls monitored observer subscription callback interface RxJavaHooks. OnObservableStart (observables, observable.onSubscribe).call(subscriber);


    private Observable<String> observableString;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple2);

        // Create an observed
        // Configure the callback interface --OnSubscribe
        // Why do you want to configure it?
        // Listen to the observer subscription, once an observer subscribed, immediately callback change interface
        observableString = Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> observer) {
                        Log.i("main"."Come back.");
                        // Access request
                        // So there are some things we can do in this method
                        // To communicate with the observer
                        for (int i = 0; i < 5; i++) {
                            observer.onNext("The first" + i + "A number.");
                        }
                        // Access complete
                        // When our data transfer is completeobserver.onCompleted(); }}); }Copy the code

    public void click(View v) {
        // Observer subscription
        // Callback principle:
        // Core code:
        // hook.onSubscribeStart(observable,
        // observable.onSubscribe).call(subscriber);
        observableString.subscribe(new Observer<String>() {
            @Override
            public void onCompleted(a) {
                Log.i("main"."---onCompleted---");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Oh,no! Something wrong happened!");
            }

            @Override
            public void onNext(String item) {
                // Accept data
                Log.i("main"."Observer receives data:"+ item); }}); }Copy the code

Results output

08-02 09:53:45. 057, 16613-16613 / com haocai. Architect. Rxjava I/main: Back to 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: data zeroth 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: data first 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: the second data 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: data third 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: the data of the fourth 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: onCompleted - - -Copy the code

ObservableString. In the subscribe RxJavaHooks. OnObservableStart (observables, observables. OnSubscribe). The call (the subscriber); Calling the Call method

Another way to send automatically


 private Observable<String> observableString;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple2);

        List<String> items = new ArrayList<String>();
        items.add("Kpioneer");
        items.add("Xpioneer");
        items.add("haocai");
        items.add("Huhu");
        // The framework itself provides such an API
        // from: Once you have an observer registered, send the message sequence immediately
        // Internal implementation of the framework
        // The framework calls the create method internally
        // Iterator mode
        // The OnSubscribeFromIterable class is dedicated to iterating through collections
        // The OnSubscribeFromArray class is dedicated to traversing groups of numbers
        observableString = Observable.from(items);
    }
Copy the code

    public void click(View v) {
        observableString.subscribe(new Observer<String>() {
            @Override
            public void onCompleted(a) {
                Log.i("main"."---onCompleted---");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Oh,no! Something wrong happened!");
            }

            @Override
            public void onNext(String item) {
                // Accept data
                Log.i("main"."Observer receives data:"+ item); }}); }Copy the code

Results output

08-02 14:38:14. 517, 32289-32289 / com haocai. Architect. Rxjava I/main: Observers received data: Kpioneer 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: Observers received data: Xpioneer 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: Observers received data: haocai 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: Observers received data: Huhu 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: onCompleted - - -Copy the code

Copyright 2014 Netflix, Inc. ** Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the  License. */
package rx.internal.operators;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;

/**
 * Converts an {@code Iterable} sequence into an {@codeObservable}. * <p> * ! [](http://upload-images.jianshu.io/upload_images/1824809-fa9342290145e00e.png? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) * <p> * You can convert any object that supports the Iterable interface into an Observable that emits each item in * the object, with the {@code toObservable} operation.
 * @param <T> the value type of the items
 */
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

    final Iterable<? extends T> is;

    public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
        if (iterable == null) {
            throw new NullPointerException("iterable must not be null");
        }
        this.is = iterable;
    }

    @Override
    public void call(final Subscriber<? super T> o) {
        Iterator<? extends T> it;
        boolean b;

        try {
            it = is.iterator();

            b = it.hasNext();
        } catch (Throwable ex) {
            Exceptions.throwOrReport(ex, o);
            return;
        }

        if(! o.isUnsubscribed()) {if(! b) { o.onCompleted(); }else {
                o.setProducer(newIterableProducer<T>(o, it)); }}}static final class IterableProducer<T> extends AtomicLong implements Producer {
        / * * * /
        private static final long serialVersionUID = -8730475647105475802L;
        // Concrete observer
        private final Subscriber<? super T> o;
       // Specific data
        private final Iterator<? extends T> it;

        IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
            this.o = o;
            this.it = it;
        }

        @Override
        public void request(long n) {
            if (get() == Long.MAX_VALUE) {
                // already started with fast-path
                return;
            }
            if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
                fastPath();
            } else
            if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) { slowPath(n); }}void slowPath(long n) {
            // backpressure is requested
            final Subscriber<? super T> o = this.o;
            final Iterator<? extends T> it = this.it;

            long r = n;
            long e = 0;

            for (;;) {
                while(e ! = r) {if (o.isUnsubscribed()) {
                        return;
                    }

                    T value;

                    try {
                        value = it.next();
                    } catch (Throwable ex) {
                        Exceptions.throwOrReport(ex, o);
                        return;
                    }

                    o.onNext(value);

                    if (o.isUnsubscribed()) {
                        return;
                    }

                    boolean b;

                    try {
                        b = it.hasNext();
                    } catch (Throwable ex) {
                        Exceptions.throwOrReport(ex, o);
                        return;
                    }

                    if(! b) {if(! o.isUnsubscribed()) { o.onCompleted(); }return;
                    }

                    e++;
                }

                r = get();
                if (e == r) {
                    r = BackpressureUtils.produced(this, e);
                    if (r == 0L) {
                        break;
                    }
                    e = 0L; }}}void fastPath(a) {
            // fast-path without backpressure
            final Subscriber<? super T> o = this.o;
            final Iterator<? extends T> it = this.it;

            for (;;) {
                if (o.isUnsubscribed()) {
                    return;
                }

                T value;

                try {
                    value = it.next();
                } catch (Throwable ex) {
                    Exceptions.throwOrReport(ex, o);
                    return;
                }

                o.onNext(value);

                if (o.isUnsubscribed()) {
                    return;
                }

                boolean b;

                try {
                    b  = it.hasNext();
                } catch (Throwable ex) {
                    Exceptions.throwOrReport(ex, o);
                    return;
                }

                if(! b) {if(! o.isUnsubscribed()) { o.onCompleted(); }return;
                }
            }
        }
    }

}
Copy the code