preface
RxJava events are emitted and consumed in the same thread, based on synchronous observer mode. The core of observer mode is the asynchronous mechanism of background processing and foreground callback. To implement asynchrony, we need to introduce another RxJava concept, the thread Scheduler.
The body of the
Without specifying a thread, RxJava follows the thread-invariant principle. That is, the thread in which the subscribe() method is called produces events; Events are consumed in the thread where they are produced. If threads need to be switched, the thread Scheduler is used.
1. Several schedulers are introduced
In RxJava, the scheduler-scheduler, which acts as a thread controller, allows RxJava to specify which thread each piece of code should run on. RxJava already has several schedulers built in that are suitable for most usage scenarios:
- Schedulers.immediate()
Runs directly on the current thread, equivalent to not specifying a thread. This is the default Scheduler.
- Schedulers.newThread()
Always enable a new thread and perform operations on the new thread.
- Schedulers.io()
Scheduler used for I/O operations (reading and writing files, reading and writing databases, network information interaction, and so on). The behavior is similar to that of newThread(), except that IO () uses an unlimited thread pool and can reuse idle threads. So IO () is more efficient than newThread() in most cases.
Note: Do not put computation tasks in IO () to avoid creating unnecessary threads.
- Schedulers.computation()
The Scheduler used to compute tasks. This calculation refers to CPU intensive computing, that is, operations that do not limit performance by operations such as I/O, such as graphics computation. This Scheduler uses a fixed thread pool of CPU cores.
Note: Don’t try to computation() on I/O operations, or WAIT time for I/O operations will waste CPU.
- AndroidSchedulers.mainThread()
Android also has a dedicated AndroidSchedulers. MainThread (), it specifies the operation will be in Android run the main thread.
2. Scheduler switches threads
2.1. Single thread switch
With these schedulers in place, threads can be controlled using the subscribeOn() and observeOn() methods.
-
SubscribeOn (): Specifies the thread where subscribe() occurs, that is, the thread where Observable.OnSubscribe is activated, or the thread where the event is generated.
-
ObserveOn (): specifies the thread on which Subscriber is run, or the thread on which the event is consumed.
Look directly at the code:
Observable.just(1.2.3.4)
.subscribeOn(Schedulers.io()) // Specify subscribe() to the IO thread
.observeOn(AndroidSchedulers.mainThread()) // Specify Subscriber callback occurs on the main thread
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:"+ number); }});Copy the code
In the above code, contents 1, 2, 3 and 4 of the created event will be issued in the IO thread due to subscribeOn(schedulers.io ()). Because observeOn (AndroidScheculers mainThread ()) specified, thus the subscriber digital printing will take place in the main thread.
In fact, this usage is so common that it applies to most “background thread fetch, main thread display” programs.
Here’s a complete example:
intdrawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); }})// Specifies that the event is emitted, that is, the image is read on the IO thread
.subscribeOn(Schedulers.io())
// Specify event consumption-callback, i.e. page image rendering occurs in the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code
The advantage is that loading the image takes place in the IO thread, while setting the image takes place in the main thread. This means that loading images can take tens or even hundreds of milliseconds without causing any lag in the interface.
2.2. Multiple thread switches
As described above, you can implement thread control by subscribeOn() and observeOn() so that events are generated and consumed on different threads. Once you know about transformation methods such as map() and flatMap(), the question arises – can you switch threads a few more times?
Because observeOn() specifies the thread of Subscriber, which is not subscribe(), but when observeOn() executes, The Subscriber corresponding to the current Observable, that is, its direct subordinate Subscriber.
That is, observeOn() specifies the thread on which it will operate after it. So if you need to switch threads multiple times, simply call observeOn() once for each location where you want to switch threads.
Look directly at the sample code:
Observable.just(1.2.3.4)
// The IO thread emitted by the event, specified by subscribeOn()
.subscribeOn(Schedulers.io())
// New thread, specified by observeOn()
.observeOn(Schedulers.newThread())
.map(mapOperator)
// IO thread, specified by observeOn()
.observeOn(Schedulers.io())
.map(mapOperator2)
// Android main thread, specified by observeOn()
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber);
Copy the code
The above code implements multiple thread switches through multiple calls to observeOn(). Unlike observeOn(), however, the subscribeOn() location can be placed anywhere, but it can only be called once.
3. Implementation principle of Scheduler
Lift () is also used to implement subscribeOn() and observeOn() internally (see above).
- Schematic for subscribeOn()
As can be seen from the figure, subscribeOn() performs thread switching (schedule in the figure… Location).
The thread switch of subscribeOn() occurs in OnSubscribe, that is, when it notifies the next level of OnSubscribe, the event has not been sent yet. Therefore, the thread control of subscribeOn() can only affect the beginning of the event sending, that is, only one thread switch is allowed.
- Schematic of observeOn()
ObserveOn () and observeOn(). Location).
The thread switch of observeOn() occurs in its built-in Subscriber, that is, when it is about to send an event to the next Subscriber, so observeOn() controls the thread behind it, allowing multiple thread switches.
- Hybrid switching schematic
Finally, a diagram is used to explain how thread scheduling occurs when multiple subscribeOn() and observeOn() are mixed:
There are 5 operations on events in the figure, as can be seen from the figure:
-
① and ② are affected by the first subscribeOn() and run in the red thread;
-
③ and ④ are affected by the first observeOn() and run on the green thread;
-
⑤ is affected by the second onserveOn() and runs on the purple thread;
-
While for the second subscribeOn(), the thread is truncated by the first subscribeOn() in the notification process, so it has no impact on the whole process.
Note: When multiple subscribeOn() are used, only the first subscribeOn() plays a role.
4. Expand
Although more than one subscribeOn() has no effect on the process for event processing, it is useful before the process. When introducing Subscriber in the previous article, it was mentioned that onStart() of Subscriber can be used as initialization before the process starts.
Since onStart() is called when SUBSCRIBE () occurs, no thread can be specified, but only the thread that subscribed when subscribe() was called. As a result, if onStart() contains code that requires a thread (for example, displaying a ProgressBar on the interface, which must be executed on the main thread), there is a risk that the subscribe() will be illegal because it is impossible to predict which thread will execute on.
Corresponding to subscriber.onstart (), there is a method observable.doonsubscribe (). Like subscriber.onstart (), it is executed after subscribe() and before the event is sent, but the difference is that it can specify a thread. By default, doOnSubscribe() is executed on the thread where subscribe() occurs; If doOnSubscribe() is followed by subscribeOn(), it will execute the thread specified by the closest subscribeOn() to it.
Example code is as follows:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call(a) {
// Execute on main thread
progressBar.setVisibility(View.VISIBLE);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
// Specify the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Copy the code
The code above, which subscribeOn() follows doOnSubscribe(), specifies which thread to work on!
summary
RxJava’s various events and event transformation models, as well as the thread scheduler based on the transformation, combined with the observer mode, make RxJava ahead of other open source frameworks in terms of asynchronous programming experience, flexibility and runtime efficiency!
Welcome to pay attention to the technical public number: Zero one Technology Stack
This account will continue to share learning materials and articles on back-end technologies, including virtual machine basics, multithreaded programming, high-performance frameworks, asynchronous, caching and messaging middleware, distributed and microservices, architecture learning and progression.