Schedulers are the core module of Rx to implement multithreading. They are mainly used to control which thread or queue tasks run on.
- MainScheduler: represents the main thread.
- SerialDispatchQueueScheduler:Abstract theserial
DispatchQueue
This scheduler is used to perform serial tasks. - ConcurrentDispatchQueueScheduler:Abstract theparallel
DispatchQueue
This scheduler can be used to perform parallel tasks. - OperationQueueScheduler:Abstract the
NSOperationQueue
.
Common use
Observable.of(1.2.3.4)
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observe(on: MainScheduler.instance)
.subscribe(onNext: { data in
print(data)
})
.disposed(by: disposeBag)
Copy the code
subscribe(on:)
: Determines where the constructor of the data sequence is locatedScheduler
To run on.observe(on:)
: indicates whichScheduler
Listen on and perform response processing.
Subscribe (on:) process analysis
How subscribe(on:) causes the sequence builder function to run on the specified Scheduler? Let’s start with the method source code
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}
Copy the code
SubscribeOn is a subclass of Producer. It can be found that the processing of sequence inherits Producer, carries out corresponding logical processing, and then creates corresponding Sink, so as to achieve flexible expansion.
final private class SubscribeOn<Ob: ObservableType> :Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob.scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
Copy the code
Source stores the source sequence, and Scheduler stores the queue where tasks need to be executed. The final logic is to look at Sink’s run method.
// SubscribeOnSink
func run(a) -> Disposable {
let disposeEverything = SerialDisposable(a)let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
Copy the code
- Call the
SubscribeOn
The savedscheduler
theschedule
Method, passing in a closure callback. schedule
Method isImmediateSchedulerType
The protocol method that we created isConcurrentDispatchQueueScheduler
, it followsImmediateSchedulerType
Protocol, corresponding toschedule
The method is implemented as follows.
/ / ConcurrentDispatchQueueScheduler schedule
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
/ / call the DispatchQueueConfiguration schedule
self.configuration.schedule(state, action: action)
}
/ / DispatchQueueConfiguration schedule
func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable(a)// self.queue is the queue to which we pass values, executed asynchronously
self.queue.async {
if cancel.isDisposed {
return
}
// Execute the action closure
cancel.setDisposable(action(state))
}
return cancel
}
Copy the code
- The final call is going to be
DispatchQueueConfiguration the schedule of implementation
. - Execute asynchronously, in an asynchronous execution function, according to the queue that the value comes in
schedule
Is the closure ofSubscribeOnSink.run()
The red box part of the method.
- Closure that executes the source sequence
subscribe
Method, and that’s itsubscribe
In what we specifyschedule
In the execution.
Observe (on:) process analysis
Observe (on:) is similar to subscribe(on:) except subscribe(on:) is executed in schedule and observe(on:) is executed in schedule.
// ObserveOnSink
override func onCore(_ event: Event<Element>) {
let shouldStart = self._lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)
switch self._state {
case .stopped:
self._state = .running
return true
case .running:
return false}}if shouldStart {
self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
}
}
Copy the code
ObserveOnSink called the scheduleRecursive(_: Action 🙂 method of the ImmediateSchedulerType protocol in the method by rewriting the onCore(_:) method
// ImmediateSchedulerType
public func scheduleRecursive<State> (_ state: State.action: @escaping (_ state: State._ recurse: (State) - >Void) - >Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
// RecursiveImmediateScheduler
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
/ / self. _scheduler preservation is ConcurrentDispatchQueueScheduler
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
// The action callback is executed in the specified _scheduler
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
.
}
// ConcurrentDispatchQueueScheduler
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
Copy the code
scheduleRecursive(_:action:)
->RecursiveImmediateScheduler.schedule(_:)
->ConcurrentDispatchQueueScheduler.schedule(_:action:)
->configuration.schedule
, through this call chain, you can see andsubscribe(on:)
Same thing, it’s always called at the endconfiguration.schedule
.- The action callback is in
schedule
Closure, implemented in the specifiedschedule
Listen in.