Rxswift (1) Functional responsive programming idea
RxSwift (II) Sequence core logic analysis
Create, subscribe, and destroy an RxSwift Observable
RxSwift (4) higher-order functions
RxSwift (v)
(6) Dispose source code analysis
RxSwift (7) RxSwift vs. SWIFT usage
RxSwift (x) Basic Usage part 1- Sequence, subscribe, destroy
RxSwift Learning 12 (Basics 3- UI Control Extensions) @TOC
RxSwift sequence core logic
The idea of functional responsive programming is simply to analyze the core logic of the sequence. This blog is mainly for the last article to do a more in-depth discussion, if there are those local analysis is wrong, please leave a message: QQ:282889543, let us improve each other, each other achievement.
In general, the core logic of Rxswift analysis follows a trilogy: create sequence, subscribe sequence, destroy sequence. The central idea is that everything is sequential.
1. Sequence creation
Observable Observable Observable sequences
Let’s look at the class inheritance involved in creating an Observable:
Based on the above class diagram, a simple analysis of the relationship and design idea of the following classes is made: First, the hierarchical implementation is very thorough, each layer only solves one thing, and the structure is very clear when stacked layer by layer: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType
Next, let’s briefly break down what each class does:
- ObservableConvertibleType: As the name implies, it can be converted to
Observable
Type protocol, only one methodasObservable
What’s the good of that?
- The user does not need to care what type of object it is
- Let users pay more attention to its core functions
- ObservableType: is a agreement, inherited ObservableConvertibleType agreement
asObservable
Subscribe, which provides the abstract method subscribe, so you can only observe an object if you subscribe to it. - ObservableA real class, which can be called a metaclass, has complete Observable functions for users, because it already has all the functions required by users, although some methods are not implemented and are still abstract.Producer: It inherits
Observable
Subscribe method and implement subscribe method - AnonymousObservable: It inherits
Producer
, and added attributeslet _subscribeHandler: SubscribeHandler
It holds the closure passed in when creating the sequence, and has the ability to call the sequence. It also implements the run method, which is the core method for creating the sequence. In the run() method it creates oneAnonymousObservableSink
Sink can be called a tube. It is similar to the role of manager and has sequence, subscription and destruction capabilities. Here are two puzzles:
Question 1. Why is AnonymousObservableSink defined as a final private class that cannot be inherited or accessed externally? Question 2. How does a created Observable relate to a subscription?
We will discuss these two questions later.
Finally, we summarize the design idea:
In fact, all observables that users use are subclasses of Producer and parallel subclasses of AnonymousObservable, but they don’t need to care about their implementation. There is a related class, AnonymousObservableSink, Sink, pipe, and all of this comes together to make it really work. AnonymousObservableSink also has sequence, subscription functions, similar to the manager role we used in our project. The whole design describes its features in a composite protocol way up and hides implementation details in a subclassing way down, similar to the factory pattern. Such classes can also be called class clusters.
The flow of sequence creation
Through the above class inheritance relationship, it is not difficult to understand the sequence creation process, it is indeed only a relatively simple few, a few lines of code to solve the difficulty is the above several questions:
Let’s take a look at the direct flow and relationships of sequence creation, subscription, and destruction through a simple Rxswift example.
Example 1:
//1. Create sequence
let ob = Observable<Any>.create { (obserber) -> Disposable in
// 3: sends signals
obserber.onNext("Kyl sent a signal.")
obserber.onCompleted()
return Disposables.create()
}
// 2: subscribe signal
let _ = ob.subscribe(onNext: { (text) in
print("Subscribe to:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("Complete") {})print("Destroyed")}Copy the code
The code in example 1 above can be clearly expressed with a diagram from cool TEACHER C:
From the code and diagram above, we might be wondering:
Question 3: Ob.subscribe (). Why did we call obserber.onNext(“kyl sends A signal “) in the obserber.onNext(” kyL sends A signal “) in the following closure at ob creation? Let _ = ob.subscribe(onNext: {(text) in print(” subscribe to :\(text)”)} let _ = ob.subscribe(onNext: {(text) in print(” subscribe to :\(text)”)} If ob sends a message, the onNext closure of subcribe() will trigger it.
The ob.subscribe() method must do something to call closure A somewhere in order to do this. How does it work? Below we will analyze the source code to answer this question.
Let ob = Observable
. Create {(obserber) -> this line does a lot of things.
Let’s start with a flow chart to get a glimpse of the sequence creation process:
Return AnonymousObservable(SUBSCRIBE). We didn’t find the answer we were looking for, and we got a little dizzy.
- AnonymousObservable class source
final private class AnonymousObservable<Element> :Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
Copy the code
Let’s just take a deep breath, relax, and let’s try to figure out the other direction, not just one way. Let’s take a look at the subscription process.
2. To subscribe to
Review the subscription code in example 1 above: let _ = ob.subscribe(onNext: {(text) in what does this line do? Here we go through the source code to in-depth analysis:
- Rxswift subscription
subscribe()
The source code is as follows:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable{... The above code is not the focus of our analysis... Indicates that a section of the source code has been ignored/* The AnonymousObserver () constructor is passed a trailing closure, eventHandler, which is triggered when a different event is received. We 'let _ = ob.subscribe(onNext: {(text) in' this method is passed to the closure */
let observer = AnonymousObserver<E> { event in.switch event {
case .next(letvalue): onNext? (value)Ob.subscribe (onNext: closure) passed in when calling the subscription
case .error(let error):
if let onError = onError {
onError(error)Ob.subscribe (onError: closure) is passed in when a subscription is called
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()Ob.subscribe (onCompleted: closure) passed in when the subscription is called
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)/* This returns to the Disposables object, to release the Disposables, and in its constructor calls self.asObservable().subscribe(observer), The asObservable() is the sequence ob that we created, ob.subscribe(), and passes in the local variable let Observer = AnonymousObserver
, */
}
Copy the code
Through the above source code we can know: Subscribe (), which is passed as arguments to the onNext() closure, onError() closure, onComplete() closure, creates an AnonymousObserver object observer, which is created with a closure, The three closures onNext, onError, and onComplete that we subscribe() pass in are called when we receive different events. The most important one is return Disposables. Create (self.asObservable().subscribe(observer), disposable) calls our real subscribe() function, The AnonymousObserver object is passed in as a parameter. Self.asobservable () is the ob sequence created by our create() function, and here we can see clearly that we pass in the parameter closure when we subscribe to the ob chain.
Why is self.asObservable() the ob returned by our create() function?
To answer this question, I need to review the Observable inheritance analyzed above: Observables – > ObservableType – > ObservableConvertibleType observables inherit ObservableType agreement, ObservableType inherit ObservableConvertibleType agreement again, and our ObservableConvertibleType asObservable provides an abstract method (), We implement the asObservable() method in our Observable class, which returns self directly.
The following is verified by the source code:
///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {...public func asObservable(a) -> Observable<E> {
return self}... }Copy the code
After analyzing the source code of Rxswift subscribe(), we find that there is a chain relationship between the closure passed in when we create OB and the closure we subscribe to, that is, as long as OB sends a message, our subscribers must be able to receive the message according to this chain. But we still don’t know how it’s called, how it’s triggered.
And we notice that self.asObservable().subscribe(observer) that AnonymousObservable calls subscribe(), We don’t find a definition of SUBSCRIBE () in the AnonymousObservable class, so we need to look at its parent Producer
- The source code for Producer is as follows:
class Producer<Element> : Observable<Element> {
override init() {
super.init()}override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E= =Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer(a)/* The run() call is passed to an observer, and the sink tube is created. The sink tube has the sequence function to call the on() method. * /
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer(a)let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
rxAbstractMethod()
}
}
Copy the code
As expected, we find the definition of subscribe () in Producer. From there, we can summarize several clear clues:
- (1) It can be known from the previous class inheritance relationship that is
Producer
To achieve theObservableType
The subscribe () method of the protocol. It’s called in this methodself.run(observer, cancel: disposer)
- (2) Self.run () is actually anonymousobservable.run (), which does three things:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
//1. Create a sink pipe object and create the observer as create()
// The closure passed into the sequence is passed to sink
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
Sink calls its own run () method and passes AnonymousObservable as an argument.
let subscription = sink.run(self)
// Return a tuple containing sink channel information.
return (sink: sink, subscription: subscription)
}
Copy the code
-
(3) The run() method of AnonymousObservableSink class calls parent-_subscribeHandler (AnyObserver(self)), where parent is the self passed by sink. Run (self) in (2). The AnonymousObservable object; And we already know that _subscribeHandler is the closure passed in as the create() function argument that is saved when the sequence is created: Let ob = Observable
. Create {(obserber) -> Disposable in // 3: obserber.onnext (“kyl “) Obserber.oncompleted () return Disposables.create()} It’s now clear that parent._SubscribeHandler (AnyObserver(self)) executes the closure and this line calls obserber.onNext(” KYL sent the signal “).
-
Now we can summarize the flow of our code through a flow chart:
We understand the logic from the subscription sequence to the call to the parameter closure passed in when we call create(), but it is not clear how the closure sends onNext() to the onNext() closure of the subscription message. So we need to analyze the AnonymousObserver
Let’s start with the AnonymousObserver class
AnonymousObserver
The source code definition is as follows:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element- > >)Void
private let _eventHandler : EventHandler
/* Constructor that holds the EventHandler trailing closure */
init(_ eventHandler: @escaping EventHandler) {#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
// Override the onCore method and call the EventHandler closure
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
Copy the code
The onNext() method is not found in AnonymousObserver source code, so we can only look up its inheritance chain. Here we need to know the class inheritance relationship:
- AnonymousObserver:
By analyzing the inheritance relationship of the class, we know that:
AnonymousObserver object on () method will call onCore () method, ObserverType onNext, onError, onComplete method. But how and when is on() called?
To solve this problem, we need to go back to the code where we created the sequence:
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) - >Observable<E> {
return AnonymousObservable(subscribe)
}
Copy the code
The create() method that creates the sequence passes a SUBSCRIBE closure and returns an AnonymousObservable object. The SUBSCRIBE closure is the closure we pass in as a parameter when we create our sequence. And the closure is saved when AnonymousObservable is initialized self._subscribeHandler = subscribeHandler AnonymousObservable has a run() method, Run method create AnonymousObservableSink object sink.
final private class AnonymousObservable<Element> :Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
Copy the code
After all this analysis and going around the circle, the key is found in the AnonymousObservableSink tube object. Sink this is a magic tube. So it stores the sequence, it stores the subscription, it stores the disposition so you have create sequence, subscribe sequence, destroy sequence at the same time.
AnonymousObservableSink AnonymousObservableSink
final private class AnonymousObservableSink<O: ObserverType> :Sink<O>, ObserverType {
typealias E = O.E
// Parent is AnonymousObservable
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
// The constructor passes in an observer sequence and Cancelable
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
// This implements the on() method of the ObserverType protocol
func on(_ event: Event<E>){#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
Calling the parent's publication, self.forwardon () calls its own on() method
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) = =0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
/* Calls the _subscribeHandler closure that we passed in when we created the sequence. Parent is the sequence that's passed in, where self is passed in the closure of the sequence and is forced to be AnyObserver and self is passed to the closure _subscribeHandler so that _subscribeHandler has the ability to subcribe. * /
return parent._subscribeHandler(AnyObserver(self))}}Copy the code
The source code of the Sink class is as follows:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: O, cancel: Cancelable) {#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>){#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
// Call the passed observer.on() method,
self._observer.on(event)
}
final func forwarder(a) -> SinkForward<O> {
return SinkForward(forward: self)}final var disposed: Bool {
return isFlagSet(self._disposed, 1)}func dispose(a) {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
deinit{#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
Copy the code
From source code analysis we know:
-
Our sink saves our sequence. When we call ob.onNext() to send a signal, because our sink already has ob, it will call on() and call self.forwardon (event) in on(). Instead, inside fowardOn() calls self._observer.on(event). Sink calls the on() method.
-
Here we summarize the general process again:
- When you create a sequence
create()
Returns aob
Ob is a sequence that is created with a closure passed inA
. Called in closure Aob.onNext()
A signal was sent. - Called when subscribing to a sequence
ob.subscribe()
Method, which creates oneAnonymousObserver
Object and calledself.asObservable().subscribe(observer)
. self.asObservable()
It’s actually oursob
Ob calls SUBSCRIBE (). AnonymousObserver does not find subscribe().- We are in
AnonymousObserver
Subscribe () is found in the parent class of subscribe()AnonymousObserver run ()
Methods. - In the run() method of AnonymousObserver, a tube sink is created and called
sink.run(self)
Sink is the object of AnonymousObservableSink, while in sink’s run() methodparent._subscribeHandler(AnyObserver(self))
The closure A (parent is AnonymousObserver) that was saved when the sequence was created was called, which explains why the closure A was called back when the subscription was made. - As for how to call onNext() method, it is also implemented by sink.
- Sink already holds OB. When we call OB.onNext () in the A closure to send A signal, it will actually be called through sink.on(). First sink.on() calls the forwardOn().
- In the forwardOn() call self._observer.on(event).
- _observer.on () will call _observer.oncore ()
- _observer.oncore (event) will call onNext(),onError(), and onComplete() based on the type of event. This _observer.onnext () calls the closure subscribe(onNext:) that we pass in when we subscribe.
- The reasons for the pullback are:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable{... The above code is not the focus of our analysis... Indicates that a section of the source code has been ignored/* The AnonymousObserver () constructor is passed a trailing closure, eventHandler, which is triggered when a different event is received. We 'let _ = ob.subscribe(onNext: {(text) in' this method is passed to the closure */
let observer = AnonymousObserver<E> { event in.switch event {
case .next(letvalue): onNext? (value)// passed in when the subscription is called
Copy the code
When we call ob.subscribe() here, we create AnonymousObserver and bind it to our SUBSCRIBE () onNext() closure, Anonymousobserver.onnext () must call back the onNext() closure that subscribe() passed in. Let Observer = AnonymousObserver
- Again, the simplest way to explain it is with this picture: