The statement

This series of articles is a record of reading (RxJava 2.x actual combat), similar to reading notes, not original

1. Type of Scheduler

1. Introduction to RxJava threads

RxJava is a library for asynchronous programming. Asynchronism is an important feature of RxJava. Reasonable use of asynchronous programming can improve the processing speed of the system. But asynchrony also brings thread safety issues, and asynchrony does not equal concurrency. By default, RxJava runs only in the current thread, which is single-threaded. Observables emit streams, Observers receive and respond to streams, and Operators process streams, all running in the same thread. The result is a synchronous function response. However, most of the practical application of functional responsiveness is in the background processing, a process of foreground response. Therefore, the previous flow needs to be modified to change to Observable generating and transmitting data flow, Operators processing data flow running in the background thread, Observer receiving and responding to data in the foreground thread. This involves using multiple threads to manipulate RxJava, which can be done using RxJava’s Scheduler.

2. Scheduler

Scheduler is an abstraction from RxJava for thread controllers, and RxJava has a built-in implementation of Scheduler.

Scheduler role
single This Thread is reused using a Thread Pool of fixed length 1 (new Scheduler Thread Pool(1))
newThread A new thread is started each time and the operation is performed in the new thread
computation The Fixed Scheduler Pool is used for CPU intensive computing
io Scheduler for I/O operations (read and write to files, read and write to databases, network information interaction, etc.). The behavior of IO () is similar to that of newthreads, except that the internal implementation of IO () uses an unlimited pool of threads and can reuse idle threads, so in most cases, IO () is more efficient than newthreads ()
trampoline It runs directly on the current thread. If other tasks are being executed on the current thread, it suspends them first
Scheduler.from The Java. Util. Concurrent. The Executor is converted into a scheduler instance, can be either a custom Executor for the scheduler

If the built-in Scheduler does not meet the business needs, you can use a custom Executor as a Scheduler to meet your personalized needs.

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
    }
}).observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception { System.out.println(s); }});Copy the code

After the Observable emits data, it switches to the newThread. The next two prints were made in newThread.


2. RxJava thread model

RxJava observers can use the thread Scheduler, or Scheduler, to switch threads when using operators.

1. Thread scheduler

Scheduler is a static factory class. Scheduler is the thread task Scheduler of RxJava, and Worker is the concrete performer of thread task. From the Scheduler source code as you can see, the Scheduler in schedulerDirect (), schedulerPeriodicllyDirect () method to create a Worker, The worker’s scheduler() and schedulerPeriodically() are then called to perform tasks respectively. Worker is also an abstract class, and each Scheduler corresponds to a specific Worker.

  1. Scheduler SingleScheduler is a new Scheduler for RxJava 2. SingleScheduler has a property called Executor that is a ScheduledExecutorService wrapped with An AtomicReference. In the SingleScheduler constructor, the executor calls lazySet().