The original article was first published in the wechat public number: Practice, welcome to pay attention to communication!
RxJava is the open source implementation of ReactiveX in Java, a library for asynchronous programming and event-based programs using observable sequences. RxJava focuses on asynchronous programming and chained calls and event sequences.
- The introduction of RxJava
- concept
- Basic implementation
- Just the operator
- From the operator
- Defer operator
- The empty operator
- Never operator
- The timer operator
- The interval operator
- The range operator
- conclusion
The introduction of RxJava
implementation "IO. Reactivex. Rxjava2: rxjava: 2.2.3"
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.0'
Copy the code
concept
Several important concepts in RxJava are Observer, Observable and event sequence. The event sequence is completely controlled by the observed. What if the observed informs the Observer when necessary, which requires the establishment of a subscription relationship between the observed and the Observer? After the subscription relationship is established, when the observed changes, the observer can receive the observed changes in the first time.
The Observer callback method in RxJava2 has four:
- OnSubscribe: Used to unsubscribe
- OnNext: The observer calls back to the method to receive the sequence of events that were sent when the event was sent
- OnError: The observer calls back when the event is sent. This method indicates that the event sequence is abnormal and events are no longer allowed to be sent
- OnComplete: The observer calls back when the event is sent. This method indicates that the sequence of events has been sent, allowing the event to be sent
Note:
- Events are not allowed to continue after onError, and events are allowed to continue after onComplete. Regardless of whether the event can continue to be sent, both observers will not receive the message;
- OnError and onComplete are mutually exclusive, so if you call onError after onComplete it will crash, But calling onComplete after onError doesn’t crash because it doesn’t allow events to be sent after onError;
- Among the four callback methods, once the observer and the observed establish a subscription relationship, onSubscribe method will be callback, onNext, onError, onComplete method callback is completely determined by the observed trigger, which is easy to misunderstand.
Basic implementation
- Create an Observer Observer. The Observer decides what to do when the time happens.
/ / observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// Unsubscribe
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
// The observer is called back when the event is sent
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
// Observer callback when sending events (event sequence is abnormal)
Log.i(TAG, "onError--->");
}
@Override
public void onComplete(a) {
// The observer calls back when the event is sent (the sequence of events has been sent)
Log.i(TAG, "onComplete--->"); }};Copy the code
- Create an Observable that determines when and what events are triggered. See the following for details:
// Observed
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3"); }});Copy the code
- To establish the subscription relationship between the observer and the observed, refer to the following:
// Establish a subscription relationship between the observer and the observed
observable.subscribe(observer);
Copy the code
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
Copy the code
Obviously, since the onComplete method is called after Event2 is sent, Event3 sent after that will not be received by the observer.
The above code can also be written like this, the result is the same, specific reference is as follows:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The above code uses the Create method of Observable to create an Observable and send related events. To help you understand, take a look at the official sketch of the create operator:
There are also a number of static methods available to create an Observable. These common methods are described below.
Just the operator
Just can be used to create an Observable that sends specified events. The upper limit of just sending events is 10, i.e., 10 events at most. Compared with create, it simplifies the processing process to some extent.
public static <T> Observable<T> just(T item)
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Copy the code
Here’s a simple use of the just operator:
// Simple use of the just operator
Observable.just("Event1"."Event2"."Event3")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
Copy the code
Take a look at the official schematic of the just operator. Here is a schematic of the four events just sends:
From the operator
The FROM operators can be used to create observables that send arrays, Iterable, and asynchronous tasks. The FROM operators can be classified as follows:
/ / array
public static <T> Observable<T> fromArray(T... items)
/ / collection
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
// Asynchronous tasks
public static <T> Observable<T> fromFuture(Future<? extends T> future)
// Asynchronous task + timeout
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
// Asynchronous task + timeout + thread scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
// Asynchronous task + thread scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
Publishers in Reactive Streams are used in a similar way to the create operator, where events are sent at the discretion of the publisher (the observed)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
Copy the code
fromArray/fromIterable
Here’s how to use fromArray:
// Simple use of the fromArray operator
String[] events = {"Event1"."Event2"."Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
Take a look at fromArray’s official schematic below:
Here are some ways to use fromIterable:
// Simple use of the fromIterable operator
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
Take a look at fromIterable’s official sketch as follows:
The output reference for the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
Copy the code
fromCallable
Callable is located in the java.util.concurrent package, similar to Runnable, but with a return value. Events issued from fromCallable are issued from the main thread. Here are a few things to note when using fromCallable:
- When time-consuming tasks are involved, subscribeOn should be used to switch the subscribing thread;
- The time-consuming task is to receive the emission value of the Observable by switching it to the Main thread using observeOn.
- To avoid problems such as memory leaks, unsubscribe in the corresponding onDestroy method.
Here’s a simple way to use fromCallable:
// Simple use of the fromCallable operator
Observable.fromCallable(new Callable<String>() {
@Override
public String call(a) throws Exception {
// Other operations...
return "call";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The above results are as follows:
onSubscribe--->
onNext--->call
onComplete--->
Copy the code
Take a look at the official fromCallable diagram below:
fromFuture
FromFuture has four overloaded methods that specify asynchronous tasks, task timeouts, thread schedulers, etc. The Future interface is located in the java.util.concurrent package. The main function is to judge whether the task is executed, obtain the task result, and cancel the specific task for the asynchronous task execution of Runnable and Callable. The Runnable and Callable are accompanied by the execution of the thread. This means that events sent from fromFuture are sent from non-main threads. Remember to switch the subscribeOn thread if you perform time-consuming tasks. The following uses FutureTask as an example to illustrate how to use fromFuture.
Create a Callable to execute an asynchronous task as follows:
// Asynchronous tasks
private class MCallable implements Callable<String> {
@Override
public String call(a) throws Exception {
Log.i(TAG, "Mission execution begins -->");
Thread.sleep(5000);
Log.i(TAG, "Mission completed -->");
return "MCallable"; }}Copy the code
Then, create a FutureTask as follows:
/ / create FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
Copy the code
Then, execute the Future created above with Thread as follows:
/ / FutureTask execution
new Thread(mFutureTask).start();
Copy the code
Finally, use fromFuture to create the corresponding Observeable and subscribe to it as follows:
//fromFuture
Observable.fromFuture(mFutureTask)
.subscribeOn(Schedulers.io()) // Switch the subscription thread
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The result of the above code is as follows:
Task execution start --> onSubscribe-- > Task execution end --> onNext-- >MCallable onComplete-- >Copy the code
The fromFuture method carries a Future parameter as follows:
The above asynchronous task is delayed by 5 seconds. If the fromFuture overload method is used to specify a timeout of 4 seconds, see the following:
// Specify a timeout period of 4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
/ /...
Copy the code
If the task cannot be completed within 4 seconds, the Observer will raise onError.
Mission start - > onSubscribe - > onError - > Java. Util. Concurrent. TimeoutException end of task execution -- - >Copy the code
How to cancel the asynchronous task, which is the advantage of the Future, you can cancel the task at will, see the following details:
// Cancel the asynchronous task
public void cancelTask(View view) {
if (mFutureTask.isDone()) {
Log.i(TAG, "Mission accomplished -->");
} else {
Log.i(TAG, "Mission in progress -->");
boolean cancel = mFutureTask.cancel(true);
Log.i(TAG, "Did the task cancel successfully --cancel->" + cancel);
Log.i(TAG, "Mission cancelled successfully --isCancelled->"+ mFutureTask.isCancelled()); }}Copy the code
The following is the result of canceling a task:
Task execution started --> onSubscribe-- > Task executing --> Task cancellation successful -- Cancel -->trueMission cancelled successfully --isCancelled->true
onError--->java.util.concurrent.CancellationException
Copy the code
This cancels asynchronous tasks that are being executed, and this section is more about Java Futures.
Defer operator
When you use defer to create an Observable, it only creates an Observable and sends related events when you subscribe. Here’s how the defer operator is used:
//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call(a) throws Exception {
returnObservable.just(defer); }}); defer ="new";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The result of the above code is as follows:
onSubscribe--->
onNext--->new
onComplete--->
Copy the code
Obviously, the latest Observable factory is created before the subscription, and the data received in onNext is the latest. To understand the defer operator, take a look at the official defer operator schematic:
The empty operator
The empty operator can be used to create an Observable that terminates normally without any data, as follows:
//empty
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The output of the above code is as follows:
onSubscribe--->
onComplete--->
Copy the code
To help you understand the empty operator, take a look at some official illustrations of the empty operator:
Never operator
The never operator allows you to create an Observable that does not generate any data and does not terminate.
//never
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The output of the above code is as follows:
onSubscribe--->
Copy the code
To help you understand the use of the never operator, take a look at some official schematics of the never operator:
The timer operator
The timer operator can create a delayed Observable that sends a fixed value of 0. It can also specify a thread scheduler.
/ / delay
public static Observable<Long> timer(long delay, TimeUnit unit)
// Delay + thread scheduler
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
Copy the code
Here is how the timer is used:
//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long s) {
Log.i(TAG, "onNext--->"+s);
Log.i(TAG, "Current thread -->"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The result of the above code is as follows:
onSubscribe--->
// The data will be received after 3 seconds
onNext--->0Current thread -->RxCachedThreadScheduler --1
onComplete--->
Copy the code
To understand the use of the timer operator, take a look at some official diagrams of the timer operator. Here is an example of how a timer specifies a delayer and a thread scheduler:
The interval operator
The interval operator allows you to create an Observable that sends integer values at fixed intervals. Interval can specify initial latency, time intervals, thread schedulers, etc.
// Initial delay + interval
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
// Initial delay + interval + thread scheduler
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler
scheduler)
// Time interval
public static Observable<Long> interval(long period, TimeUnit unit)
// Interval + thread scheduler
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
Copy the code
Here’s how interval is used:
//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext--->"+aLong);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The above code will continue to send integer events every 3 seconds with the following result:
onSubscribe--->
onNext--->0
onNext--->1
onNext--->2.Copy the code
To understand the use of the interval operator, take a look at some of the official diagrams of the interval operator. Here is an example of how interval specifies the interval and unit of time:
The range operator
Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable
// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)
Copy the code
The usage of range is as follows:
//range
Observable.range(1.5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext--->"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete(a) {
Log.i(TAG, "onComplete--->"); }});Copy the code
The result of the above code is as follows:
onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
Copy the code
To help you understand the range operator, take a look at some official illustrations of the range operator:
conclusion
This article mainly introduces RxJava2 related basic knowledge and RxJava2 creation operator understanding and use. You can choose to pay attention to personal wechat public number: practice to get the latest updates, exchange and study together!