Schedulers is the core module of RxSwift to realize multithreading. It is mainly used to control which thread or queue a task runs on.

Friends in the usual development process, must have used the network request, network request is executed in the background, after obtaining data, and then update the UI in the main thread.

Here’s some network request pseudocode.

DispatchQueue.global(qos: .userInitiated).async {
    
    let data = try? Data(contentsOf: url)
    
    DispatchQueue.main.async {
        / / update the UI}}Copy the code

If the above network request were implemented with RxSwift, it would look something like this:

let rxData: Observable<Data> =... rxData .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self](data) in
        / / update the UI
    })
    .disposed(by: disposeBag)
Copy the code

Description:

  1. We usesubscribeOnTo determine where the constructor of the data sequence is locatedSchedulerTo run on. In the above example, due to the fetchDataIt takes a long time, sosubsribeOnSwitch to theBackground SchedulerIn order to getData. This avoids blocking the main thread.
  2. We useobserveOnTo decide which oneSchedulerListen for this data sequence. In the example above, passobserverOnMethod switches to the main thread to listen for and process the results.

Below 👇 introduce several schedulers in RxSwift

MainScheduler

MainScheduler represents the main thread. If you need to execute uI-related tasks, you need to switch to this Scheduler.

Check out the source code:

MainScheduler
DispatchQueue.main

SerialDispatchQueueScheduler

Serial DispatchQueue SerialDispatchQueueScheduler abstraction. If you need to perform some serial tasks, you can switch to this Scheduler to perform them.

Check out the source code:

SerialDispatchQueueScheduler
DispatchQueue
self.configuration

ConcurrentDispatchQueueScheduler

Parallel DispatchQueue ConcurrentDispatchQueueScheduler abstraction. If you need to execute some concurrent tasks, you can switch to this Scheduler to execute them.

Check out the source code:

ConcurrentDispatchQueueScheduler
SerialDispatchQueueScheduler
self.configuration
DispatchQueue

OperationQueueScheduler

OperationQueueScheduler abstracts NSOperationQueue. It has some characteristics of NSOperationQueue. For example, you can set maxConcurrentOperationCount to control execution: the maximum number of concurrent tasks at the same time.

Check out the source code:

OperationQueueScheduler
OperationQueue
Priority queuePriority

Scheduler scheduling execution

As can be seen from the source code of several schedulers in the previous section, all Scheduler schedulers are derived from the ImmediateSchedulerType protocol.

schedule
schedule

SerialDispatchQueueScheduler scheduler, for example:

public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) - >Disposable {
    return self.scheduleInternal(state, action: action)
}
Copy the code
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) - >Disposable {
    return self.configuration.schedule(state, action: action)
}
Copy the code
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) - >Disposable {
    return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
Copy the code
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) - >Disposable {
    return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
Copy the code

The above four methods, are SerialDispatchQueueScheduler scheduling method in the scheduler.

Analysis of the above methods reveals that one of the methods in self.configuration will eventually be called. And view the source of several Scheduler can know, the Scheduler has a important attribute of the let configuration: DispatchQueueConfiguration. It holds the queue and leeway information we need.

Then, we analysis the DispatchQueueConfiguration method.

schedule
schedule
The core logic
It is under the current queue that the asynchronous schedule executes the closureaction(state)

Let’s look at other methods:

func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) - >Disposable {
    let deadline = DispatchTime.now() + dispatchInterval(dueTime)
    
    let compositeDisposable = CompositeDisposable(a)let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: deadline, leeway: self.leeway)
    
    // For space reasons, omit some code...
    timer.setEventHandler(handler: {
        if compositeDisposable.isDisposed {
            return
        }
        _ = compositeDisposable.insert(action(state))
        cancelTimer.dispose()
    })
    timer.resume()
    
    _ = compositeDisposable.insert(cancelTimer)
    
    return compositeDisposable
}
Copy the code
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) - >Disposable {
    let initial = DispatchTime.now() + dispatchInterval(startAfter)
    
    var timerState = state
    
    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
    
    // For space reasons, omit some code...
    
    timer.setEventHandler(handler: {
        if cancelTimer.isDisposed {
            return
        }
        timerState = action(timerState)
    })
    timer.resume()
    
    return cancelTimer
}
Copy the code

Although the above two methods do not invoke the closure asynchronously in the current queue directly, they are created in the current queue when the timer is created. Therefore, the eventHandler is also executed in the current queue when the timer callback is performed, indirectly implementing the scheduling under the current queue.

The above is the introduction and simple scheduling analysis of Scheduler Scheduler. If there is any deficiency, please comment and correct it.

With a diagram