Perspective RxSwift core logic
It’s a little bit longer, but you can skip some parts.
- If you are familiar with the source code, it is recommended to look at the diagram directly, the sequence diagram is more clear. Reading text is necessary for the first time.
- The posted code omits unnecessary parts and replaces them with ellipses.
The sample
The basic usage of RxSwift is a few simple steps:
- Create an observable sequence
- Listening sequence (subscription signal)
- Destruction of the sequence
// Create sequencelet ob = Observable<Any>.create { (observer) -> Disposable in// Send signal observer.onnext ("Sesame sauce cold skin of the Day.")
observer.onCompleted()
returnDisposables. Create ()} // Subscription signallet _ = ob.subscribe(onNext: { (text) in
print("Subscribe to: \(text)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("Complete") {})print("Destroyed")} Console output: Subscribe to: today's portion of Majiang liangpi completed destructionCopy the code
To explore the
Before looking at the source code, you should have some knowledge of the classes and protocols you are working with, so that you can understand them later. The following diagram should be familiarized with in case of need:
What exactly underpins such easy calls?
Observable
. Create creates an Observable sequence Observable that subscribes to messages.
As you can see from the output, all are subscribed messages. So when is the closure passed in to subscribe called?
From just a few lines of code, you can also guess that this is caused by a call to observer.onNext in the closure of the first line of code. However, we also don’t see where the closure in the **create function is executed. **
For clarity, we’ll call the closure **create and the closure ** SUBSCRIBE.
You can’t see it from the outside, so let’s just go inside RxSwift and explore what do create and Subscribe do?
An implementation of the create function:
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
Copy the code
This function actually creates an AnonymousObservable sequence object called AnonymousObservable. The previous closure was also used to initialize the AnonymousObservable object.
AnonymousObservable class:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
Copy the code
Here you assign the CREATE closure to the _subscribeHandler attribute at initialization.
At this point, the Observable
. Create function actually creates an AnonymousObservable AnonymousObservable sequence object and holds the create closure.
. It seems that this is not the main line! None of these questions were answered.
** SUBSCRIBE function **
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
......
let observer = AnonymousObserver<E> { event in. switch event {case .next(letvalue): onNext? (value)case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
Copy the code
This is also an implementation of the function defined in the ObservableType protocol, which returns a Disposable. This Disposable is used to manage the lifetime of the subscription, which is not shown in the sample code and is actually handled inside the subscription signal. After that, we create AnonymousObserver and pass in the closure when AnonymousObserver is initialized and assign the _eventHandler property.
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
Copy the code
Previously, the AnonymousObservable anonymous sequence object saved the CREATE closure. At this point, the AnonymousObserver AnonymousObserver object is created, holding the EventHandler closure executed on the subscribe closure’s callback.
Another branch… Along the way, you’re creating objects and saving closures. The two main questions are still untraceable. Was it the wrong way from the beginning? Not too! Continue to see what is called “another village”.
The SUBSCRIBE function of AnonymousObservable also returns a newly created Disposable after creating AnonymousObserver. The key is in the first argument here: self.asObservable().subscribe(observer). The asObservable returns self again, as you can see in the ObserverType attached to it. The rest is just subscribe from the parent Producer of AnonymousObservable:
class Producer<Element> : Observable<Element> {
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if! CurrentThreadScheduler.isScheduleRequired { ...... }else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
Copy the code
Since we’re executing in the current thread, we’ll just look at the else part. Don’t take care of yourself. The key is the Observer parameter, which contains the EventHandler closure for processing the SUBSCRIBE closure. The observer passes self.run(observer, cancel). So, go back to the run function in the AnonymousObservable class ** :
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
Copy the code
Here an AnonymousObservableSink object is created, and observer and cancel continue to be dropped into the initialization function:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
......
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
Copy the code
Sink. Run (self) passes in self and alias Parent to AnonymousObservable. Isn’t parent-_subscribeHandler the same create closure that AnonymousObservable saves when it first calls it? AnyObserver(self) takes AnonymousObservableSink as an AnyObserver initialization parameter.
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
Copy the code
The on function of AnonymousObservableSink is also assigned to the AnyObserver property Observer, which is EventHandler. This EventHandler is used in the Create closure.
This is the answer to the second main question: when is the closure in the create function called?
Tidy it up:
- from
subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable
Create in functionDisposable
start AnonymousObservable
callsubscribe(observer)
AnonymousObservable
callrun(observer, cancel)
- create
AnonymousObservableSink(observer: observer, cancel: cancel)
And,sink.run(self)
parent._subscribeHandler(AnyObserver(self))
This is a line from
The subscribe closure
–>Create a closure
The line.
That’s not all, there’s another ** how does a create closure trigger a subscribe closure? **
Observer.onnext (” Today’s portion of ma Jiang liang PI “). Observer. onNext(” Today’s majiang liangpi “) :
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
Copy the code
The AnyObserver constructor returns self.observer(event), and the AnonymousObservableSink on is assigned to the self.observer. You’ll go to the on function of AnonymousObservableSink. The forwardOn(event) of AnonymousObservableSink’s parent class, Sink, is called:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
......
init(observer: O, cancel: Cancelable) {
......
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
......
self._observer.on(event)
}
}
Copy the code
Self._observer. on(event) The _observer attribute is the AnonymousObserver object passed in when AnonymousObservableSink is initialized.
Continue with AnonymousObserver on:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
......
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
Copy the code
There is no on here, look at the parent class ObserverBase:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
Copy the code
So in this on function, we have a.next branch that calls self.oncore (event). Subclass AnonymousObserver implements onCore that calls self._eventhandler (event).
What is this _eventHandler? Subscribe closure saved when AnonymousObserver is initialized. So the observer.onNext(” Today’s sauce “) in the CREATE closure triggers the subscribe closure.
This is a line of AnyObserver- > SUBSCRIBE closure arguments when calling _subscribeHandler (that is, create closure) from AnonymousObservable.
Now we have a clear view of the reactive data flow:
- The Observer is created when the signal is subscribed and the closure is executed when the sequence is created
- In the closure that creates the sequence, there is a callback observer that listens for changes in the sequence and triggers a subscription signal
The illustration
Do you see that? It doesn’t seem obvious. After all, there are several classes, protocols, and so many functions to call around. Give it one more stroke. Now that we’ve figured out the various invocation flows, let’s figure out what it does.
A picture of 1000 words, since the drawing of the picture, less keyboard!
A close look at the flow chart shows that of the few classes that appear, the three at the beginning of Anonymous are the ones doing most of the work. The operations we call outside are actually using some classes that are encapsulated inside RxSwift.
- Create an observable sequence
AnonymousObservable
And saveCreate a closure
- Subscribe to the signal
- I created one first
AnonymousObserver
And get it rightThe subscribe closure
The operation is encapsulated asEventHandler
- Hard labor or
AnonymousObservable
To dry.- In creating the return value
Disposable
bysubscribe(observer)
,AnonymousObserver
Passed on toAnonymousObservableSink
AnonymousObservableSink
Is the heart of information processing, because he knows too muchAnonymousObservableSink
There areAnonymousObserver
The observer,AnonymousObserver
holdingEventHandler
.AnonymousObservableSink
In the callrun
Function is also passed inAnonymousObservable
Sequence,AnonymousObservable
isCreate a closure
The holder of.AnonymousObservableSink
In addition to the observer, there is an initialization that manages the life cycle of the sequenceDisposable
.
AnonymousObservableSink
As an inner class, in theCreate a closure
Needs to be converted to when a parameter is called back to the outside worldAnyObserver
Here,AnyObserver
Is preserved as a closure propertyAnonymousObservableSink
theon
function- And then when the signal changes
AnyObserver
By the value of this propertyAnonymousObservableSink
- In creating the return value
- I created one first
Observable–>AnonymousObservable–>AnonymousObserver–>AnonymousObservableSink–>AnyObserver–> Create closure
- A signal
- This process is basically the reverse of subscribing to signals
- from
Create a closure
In the callAnyObserver
theonNext
start - through
AnyObserver.observer
Access the closureAnonymousObservableSink
AnonymousObservableSink
haveAnonymousObserver
AnonymousObserver
controlEventHandler
- An end
Signal: Create closure –>AnyObserver–>AnonymousObservableSink–>AnonymousObserver–> SUBSCRIBE closure