1. What is RxJava?
1.1 What is Responsive programming?
Is a programming model based on the concept of asynchronous data flow (asynchronous data flow programming) Data flow -> rivers (observed, filtered, manipulated)
1.2 The design principles of responsive programming are:
Keeping data immutable without sharing blocking is harmful
1.3 Provides a solution in our Java – RxJava? \
RxJava: Reactive Extensions Java was born. There is responsive programming in iOS development in Net.
// Load the file
// new Thread() {
// @Override
// public void run() {
// super.run();
// for (File folder : folders) {
// File[] files = folder.listFiles();
// for (File file : files) {
// if (file.getName().endsWith(".png")) {
// final Bitmap bitmap = getBitmapFromFile(file);
// // Update the UI thread
// runOnUiThread(new Runnable() {
// @Override
// public void run() {
// imageCollectorView.addImage(bitmap);
/ /}
/ /});
/ /}
/ /}
/ /}
/ /}
// }.start();
Copy the code
RxJava writing
File[] folders = new File[10];
Observable.from(folders)
/ / convenience
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
returnObservable.from(file.listFiles()); }})/ / filter
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
/ / conditions
return file.getName().endsWith(".png"); }})// Load the image
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
/ / update the UI
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); }});Copy the code
File array flatMap: the equivalent of manually nesting loop queue data structure you will find the following simple advantages: 1: you don’t need to worry about threads 2: you don’t need to worry about how to update the UI thread, how to call
2.RxJava overall architecture design?
Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable The observer subscribes to Subjects: Observable and Subjects are two "producing" entities, and Observer and Subscrible are two "consuming" entities hot Observables and cold Observables. From the perspective of emission, there are two different Observables: Hot and cold. A "hot" Observable typically starts emitting data as soon as it's created. So all subsequent observers that subscribe to it may start receiving data from somewhere in the middle of the sequence (some data is missed). A "cold" Observable waits until an observer subscribes to it before transmitting data, so the observer can be sure to receive the entire data sequence. Hot and Cold Hot: Active scenario: there is currently only one observer in the container, sent to all observers3Because the hot Observables send messages as soon as they are created, suppose I send a second message now, and then all of a sudden I add an observer, and the second observer doesn't get the message. Cold: Passive scenario: The container currently has only1Since the cold Observables will wait for an observer to subscribe once they are created, I immediately send all messages to that observer (subscriber).Copy the code
3.RxJava Basic API?
First case: How do I create Observables?
Subscribe
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */
}
// new Subscriber so onStart it
subscriber.onStart();
/ * * See https://github.com/ReactiveX/RxJava/issues/216 for the discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */
// if not already wrapped
if(! (subscriberinstanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD}}returnSubscriptions.unsubscribed(); }}public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
/**
* Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
@Override
public void onCompleted(a) {
if(! done) { done =true;
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
RxJavaHooks.onError(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally { // NOPMD
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaHooks.onError(e);
throw newUnsubscribeFailedException(e.getMessage(), e); }}}}/**
* Notifies the Subscriber that the {@code Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if(! done) { done =true; _onError(e); }}/**
* Provides the Subscriber with a new item to observe.
* <p>
* The {@code Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this); }}/**
* The logic for {@code onError} without the {@code isFinished} check so it can be called from within
* {@code onCompleted}.
*
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
*/
@SuppressWarnings("deprecation")
protected void _onError(Throwable e) { // NOPMD
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
try {
actual.onError(e);
} catch (OnErrorNotImplementedException e2) { // NOPMD
/ * * onError isn 't implemented so throw https://github.com/ReactiveX/RxJava/issues/198 * * * * Rx Design Guidelines 5.2 * * "when calling the Subscribe method that only has an onNext argument, the OnError behavior * will be to rethrow the exception on the thread that the message comes out from the observable * sequence. The OnCompleted behavior in this case is to do nothing." */
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.".new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD
}
throw e2;
} catch (Throwable e2) {
/* * throw since the Rx contract is broken if onError failed * * https://github.com/ReactiveX/RxJava/issues/198 */
RxJavaHooks.onError(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.".new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError".new CompositeException(Arrays.asList(e, e2)));
}
// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw newOnErrorFailedException(unsubscribeException); }}/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
* @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
*/
public Subscriber<? super T> getActual() {
returnactual; }}Copy the code
Subscriber is actually an Observer
RxJava basically uses source code analysis
Analysis of Observable creation principle: Step 1: call observable. create() method step 2: Add observer subscription to listen to Observable.OnSubscrible Step 3: Create Observable new Observable (hook. OnCreate (f)) in Observable. Step 4: Store the observer subscription listener in the Observable constructor
Subscribe (New Observer()); subscribe(New Observer())
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(newObserverSubscriber<T>(observer)); } method to register an observerCopy the code
Step 3: Call it in the Observable class
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
Copy the code
Observable.subscribe(subscriber, this); Step 5: In observables. The subscribe method calls monitored observer subscription callback interface RxJavaHooks. OnObservableStart (observables, observable.onSubscribe).call(subscriber);
private Observable<String> observableString;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
// Create an observed
// Configure the callback interface --OnSubscribe
// Why do you want to configure it?
// Listen to the observer subscription, once an observer subscribed, immediately callback change interface
observableString = Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
Log.i("main"."Come back.");
// Access request
// So there are some things we can do in this method
// To communicate with the observer
for (int i = 0; i < 5; i++) {
observer.onNext("The first" + i + "A number.");
}
// Access complete
// When our data transfer is completeobserver.onCompleted(); }}); }Copy the code
public void click(View v) {
// Observer subscription
// Callback principle:
// Core code:
// hook.onSubscribeStart(observable,
// observable.onSubscribe).call(subscriber);
observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {
Log.i("main"."---onCompleted---");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String item) {
// Accept data
Log.i("main"."Observer receives data:"+ item); }}); }Copy the code
Results output
08-02 09:53:45. 057, 16613-16613 / com haocai. Architect. Rxjava I/main: Back to 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: data zeroth 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: data first 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: the second data 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: data third 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: Observers received data: the data of the fourth 08-02 09:53:45. 057, 16613-16613 / com. Haocai. Architect. Rxjava I/main: onCompleted - - -Copy the code
ObservableString. In the subscribe RxJavaHooks. OnObservableStart (observables, observables. OnSubscribe). The call (the subscriber); Calling the Call method
Another way to send automatically
private Observable<String> observableString;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
List<String> items = new ArrayList<String>();
items.add("Kpioneer");
items.add("Xpioneer");
items.add("haocai");
items.add("Huhu");
// The framework itself provides such an API
// from: Once you have an observer registered, send the message sequence immediately
// Internal implementation of the framework
// The framework calls the create method internally
// Iterator mode
// The OnSubscribeFromIterable class is dedicated to iterating through collections
// The OnSubscribeFromArray class is dedicated to traversing groups of numbers
observableString = Observable.from(items);
}
Copy the code
public void click(View v) {
observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {
Log.i("main"."---onCompleted---");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String item) {
// Accept data
Log.i("main"."Observer receives data:"+ item); }}); }Copy the code
Results output
08-02 14:38:14. 517, 32289-32289 / com haocai. Architect. Rxjava I/main: Observers received data: Kpioneer 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: Observers received data: Xpioneer 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: Observers received data: haocai 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: Observers received data: Huhu 08-02 14:38:14. 517, 32289-32289 / com. Haocai. Architect. Rxjava I/main: onCompleted - - -Copy the code
Copyright 2014 Netflix, Inc. ** Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package rx.internal.operators;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;
/**
* Converts an {@code Iterable} sequence into an {@codeObservable}. * <p> * ! [](http://upload-images.jianshu.io/upload_images/1824809-fa9342290145e00e.png? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) * <p> * You can convert any object that supports the Iterable interface into an Observable that emits each item in * the object, with the {@code toObservable} operation.
* @param <T> the value type of the items
*/
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
final Iterable<? extends T> is;
public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
@Override
public void call(final Subscriber<? super T> o) {
Iterator<? extends T> it;
boolean b;
try {
it = is.iterator();
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if(! o.isUnsubscribed()) {if(! b) { o.onCompleted(); }else {
o.setProducer(newIterableProducer<T>(o, it)); }}}static final class IterableProducer<T> extends AtomicLong implements Producer {
/ * * * /
private static final long serialVersionUID = -8730475647105475802L;
// Concrete observer
private final Subscriber<? super T> o;
// Specific data
private final Iterator<? extends T> it;
IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
this.o = o;
this.it = it;
}
@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastPath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) { slowPath(n); }}void slowPath(long n) {
// backpressure is requested
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;
long r = n;
long e = 0;
for (;;) {
while(e ! = r) {if (o.isUnsubscribed()) {
return;
}
T value;
try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
o.onNext(value);
if (o.isUnsubscribed()) {
return;
}
boolean b;
try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if(! b) {if(! o.isUnsubscribed()) { o.onCompleted(); }return;
}
e++;
}
r = get();
if (e == r) {
r = BackpressureUtils.produced(this, e);
if (r == 0L) {
break;
}
e = 0L; }}}void fastPath(a) {
// fast-path without backpressure
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;
for (;;) {
if (o.isUnsubscribed()) {
return;
}
T value;
try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
o.onNext(value);
if (o.isUnsubscribed()) {
return;
}
boolean b;
try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if(! b) {if(! o.isUnsubscribed()) { o.onCompleted(); }return;
}
}
}
}
}
Copy the code