Schedulers is the core module of RxSwift to realize multithreading. It is mainly used to control which thread or queue a task runs on.
Friends in the usual development process, must have used the network request, network request is executed in the background, after obtaining data, and then update the UI in the main thread.
Here’s some network request pseudocode.
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
/ / update the UI}}Copy the code
If the above network request were implemented with RxSwift, it would look something like this:
let rxData: Observable<Data> =... rxData .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self](data) in
/ / update the UI
})
.disposed(by: disposeBag)
Copy the code
Description:
- We use
subscribeOn
To determine where the constructor of the data sequence is locatedSchedulerTo run on. In the above example, due to the fetchData
It takes a long time, sosubsribeOn
Switch to theBackground SchedulerIn order to getData
. This avoids blocking the main thread. - We use
observeOn
To decide which oneSchedulerListen for this data sequence. In the example above, passobserverOn
Method switches to the main thread to listen for and process the results.
Below 👇 introduce several schedulers in RxSwift
MainScheduler
MainScheduler represents the main thread. If you need to execute uI-related tasks, you need to switch to this Scheduler.
Check out the source code:
MainScheduler
DispatchQueue.main
SerialDispatchQueueScheduler
Serial DispatchQueue SerialDispatchQueueScheduler abstraction. If you need to perform some serial tasks, you can switch to this Scheduler to perform them.
Check out the source code:
SerialDispatchQueueScheduler
DispatchQueue
self.configuration
ConcurrentDispatchQueueScheduler
Parallel DispatchQueue ConcurrentDispatchQueueScheduler abstraction. If you need to execute some concurrent tasks, you can switch to this Scheduler to execute them.
Check out the source code:
ConcurrentDispatchQueueScheduler
SerialDispatchQueueScheduler
self.configuration
DispatchQueue
OperationQueueScheduler
OperationQueueScheduler abstracts NSOperationQueue. It has some characteristics of NSOperationQueue. For example, you can set maxConcurrentOperationCount to control execution: the maximum number of concurrent tasks at the same time.
Check out the source code:
OperationQueueScheduler
OperationQueue
Priority queuePriority
Scheduler scheduling execution
As can be seen from the source code of several schedulers in the previous section, all Scheduler schedulers are derived from the ImmediateSchedulerType protocol.
schedule
schedule
SerialDispatchQueueScheduler scheduler, for example:
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) - >Disposable {
return self.scheduleInternal(state, action: action)
}
Copy the code
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) - >Disposable {
return self.configuration.schedule(state, action: action)
}
Copy the code
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) - >Disposable {
return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
Copy the code
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) - >Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
Copy the code
The above four methods, are SerialDispatchQueueScheduler scheduling method in the scheduler.
Analysis of the above methods reveals that one of the methods in self.configuration will eventually be called. And view the source of several Scheduler can know, the Scheduler has a important attribute of the let configuration: DispatchQueueConfiguration. It holds the queue and leeway information we need.
Then, we analysis the DispatchQueueConfiguration method.
schedule
schedule
The core logic
It is under the current queue that the asynchronous schedule executes the closureaction(state)
Let’s look at other methods:
func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) - >Disposable {
let deadline = DispatchTime.now() + dispatchInterval(dueTime)
let compositeDisposable = CompositeDisposable(a)let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: deadline, leeway: self.leeway)
// For space reasons, omit some code...
timer.setEventHandler(handler: {
if compositeDisposable.isDisposed {
return
}
_ = compositeDisposable.insert(action(state))
cancelTimer.dispose()
})
timer.resume()
_ = compositeDisposable.insert(cancelTimer)
return compositeDisposable
}
Copy the code
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) - >Disposable {
let initial = DispatchTime.now() + dispatchInterval(startAfter)
var timerState = state
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
// For space reasons, omit some code...
timer.setEventHandler(handler: {
if cancelTimer.isDisposed {
return
}
timerState = action(timerState)
})
timer.resume()
return cancelTimer
}
Copy the code
Although the above two methods do not invoke the closure asynchronously in the current queue directly, they are created in the current queue when the timer is created. Therefore, the eventHandler is also executed in the current queue when the timer callback is performed, indirectly implementing the scheduling under the current queue.
The above is the introduction and simple scheduling analysis of Scheduler Scheduler. If there is any deficiency, please comment and correct it.
With a diagram