Schedulers are the core module of Rx to implement multithreading. They are mainly used to control which thread or queue tasks run on.
- MainScheduler: represents the main thread.
- SerialDispatchQueueScheduler:Abstract theserial
This scheduler is used to perform serial tasks. - ConcurrentDispatchQueueScheduler:Abstract theparallel
This scheduler can be used to perform parallel tasks. - OperationQueueScheduler:Abstract the
Common use
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observe(on: MainScheduler.instance)
.subscribe(onNext: { data in
.disposed(by: disposeBag)
Copy the code
: Determines where the constructor of the data sequence is locatedScheduler
To run on.observe(on:)
: indicates whichScheduler
Listen on and perform response processing.
Subscribe (on:) process analysis
How subscribe(on:) causes the sequence builder function to run on the specified Scheduler? Let’s start with the method source code
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
Copy the code
SubscribeOn is a subclass of Producer. It can be found that the processing of sequence inherits Producer, carries out corresponding logical processing, and then creates corresponding Sink, so as to achieve flexible expansion.
final private class SubscribeOn<Ob: ObservableType> :Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob.scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription =
return (sink: sink, subscription: subscription)
Copy the code
Source stores the source sequence, and Scheduler stores the queue where tasks need to be executed. The final logic is to look at Sink’s run method.
// SubscribeOnSink
func run(a) -> Disposable {
let disposeEverything = SerialDisposable(a)let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
return disposeEverything
Copy the code
- Call the
The savedscheduler
Method, passing in a closure callback. schedule
Method isImmediateSchedulerType
The protocol method that we created isConcurrentDispatchQueueScheduler
, it followsImmediateSchedulerType
Protocol, corresponding toschedule
The method is implemented as follows.
/ / ConcurrentDispatchQueueScheduler schedule
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
/ / call the DispatchQueueConfiguration schedule
self.configuration.schedule(state, action: action)
/ / DispatchQueueConfiguration schedule
func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable(a)// self.queue is the queue to which we pass values, executed asynchronously
self.queue.async {
if cancel.isDisposed {
// Execute the action closure
return cancel
Copy the code
- The final call is going to be
DispatchQueueConfiguration the schedule of implementation
. - Execute asynchronously, in an asynchronous execution function, according to the queue that the value comes in
Is the closure
The red box part of the method.
- Closure that executes the source sequence
Method, and that’s itsubscribe
In what we specifyschedule
In the execution.
Observe (on:) process analysis
Observe (on:) is similar to subscribe(on:) except subscribe(on:) is executed in schedule and observe(on:) is executed in schedule.
// ObserveOnSink
override func onCore(_ event: Event<Element>) {
let shouldStart = self._lock.calculateLocked { () -> Bool in
switch self._state {
case .stopped:
self._state = .running
return true
case .running:
return false}}if shouldStart {
self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action:
Copy the code
ObserveOnSink called the scheduleRecursive(_: Action 🙂 method of the ImmediateSchedulerType protocol in the method by rewriting the onCore(_:) method
// ImmediateSchedulerType
public func scheduleRecursive<State> (_ state: State.action: @escaping (_ state: State._ recurse: (State) - >Void) - >Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
return Disposables.create(with: recursiveScheduler.dispose)
// RecursiveImmediateScheduler
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
/ / self. _scheduler preservation is ConcurrentDispatchQueueScheduler
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
case .done:
scheduleState = .done
return self._action
// The action callback is executed in the specified _scheduler
if let action = action {
action(state, self.schedule)
return Disposables.create()
// ConcurrentDispatchQueueScheduler
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
Copy the code
, through this call chain, you can see andsubscribe(on:)
Same thing, it’s always called at the endconfiguration.schedule
.- The action callback is in
Closure, implemented in the specifiedschedule
Listen in.