Introduction of Rxswift destroyer Dispose
- First of all, please use a mind map to get a preliminary understanding of what Dispose owned and did:
- This paper mainly focuses on the above picture, focusing on the analysis of how Dispose() destroys the sequence.
- From the figure above we can see that dispose and disposeBag are the first root nodes after the destroyer. What are they? The answers are explained below.
Introduction to Rxswift destroyer classes and important functions
1. DisposeBag
1.1 What is a DisposeBag
RxSwift and RxCocoa also have an additional tool to assist with ARC and memory management: DisposeBag. This is a virtual “package” of observers that is discarded when their parent object is released. When an object with the DisposeBag attribute calls deinit(), the virtual package is emptied and each Disposable Observer automatically unsubscribs to what it observes. This allows ARC to reclaim memory as usual. If there is no DisposeBag, there are two outcomes: either the Observer generates a retain cycle that is bound to the observed object indefinitely; Or accidentally released, causing the program to crash. So to be a good ARC person, set up Observables and add them to DisposeBag. In this way, they can be cleaned well.
When an Observable is observed and subscribed, it generates a Disposable instance that can be used to free resources. There are two ways to release resources in RxSwift, namely unbind and free space, respectively explicit and implicit:
- Explicit release allows us to call the release method directly in our code to release resources as shown in example 1:
let dispose = textField.rx_text
.bindTo(label.rx_sayHelloObserver)
dispose.dispose()
Copy the code
- Implicit release Implicit release is done through DisposeBag, which is similar to the automatic release pool mechanism in Objective-C ARC. When an instance is created, it is added to the automatic release pool of the thread in which it is created. The automatic release pool releases and rebuilds the pool after a RunLoop cycle. The DisposeBag is like an automatic release pool for RxSwift. We add resources to the DisposeBag and release them along with the DisposeBag.
Example 2:
let disposeBag = DisposeBag()
func binding() {
textField.rx_text
.bindTo(label.rx_sayHelloObserver)
.addDisposableTo(self.disposeBag)
}
Copy the code
In this code, the method addDisposableTo makes a weak reference to DisposeBag, so this DisposeBag will be referenced by the instance, usually as a member variable of the instance, and when the instance is destroyed, the member DisposeBag will follow. This frees resources bound by RxSwift on this instance.
From the above we can see that DisposeBag is like our automatic release pool in OC memory management. It acts as a garbage collection bag, you just add the sequence to the disposeBag, and the disposeBag will help us release resources at the right time, so how does it do that?
1.2 DisposeBag implementation source code analysis
1.2.1. Take a look at the class diagram:
1.2.2. Specifically analyze the source code process
- When we invoke the Disposed () method, we call the Insert () method of the Dispose class to add the disposables to the _Disposables array. The specific source code is as follows:
public final class DisposeBag: DisposeBase { private var _lock = SpinLock() // state fileprivate var _disposables = [Disposable]() fileprivate var _isDisposed = false /// Constructs new empty dispose bag. public override init() { super.init() } /// Adds `disposable` to be disposed when dispose bag is being deinited. /// /// - parameter disposable: Disposable to add. public func insert(_ disposable: Disposable) { self._insert(disposable)? .dispose() } private func _insert(_ disposable: Disposable) -> Disposable? Self._lock. lock(); self._lock.lock(); Defer {self._lock.unlock()} if self._isdisposed {// If _dispose() is disposed, it means that it has been disposed and no longer needs to be disposed to ensure symmetry. Disposables = disposable disposables = disposable disposables = disposable disposables = disposdisposables = disposable disposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables take a look at `CompositeDisposable` instead. private func dispose() { // 1. Take all saved disposables = self. _disposables () // 2. Going through every destroyer, Dispose of disposables {disposable. Dispose ()}} private func _dispose() -> dispose of disposables {disposables. Dispose ()} private func _dispose() -> [Disposable] { self._lock.lock(); Defer {self._lock.unlock()} // Take all the saved disposables = self._Disposables self._disposables.removeAll(keepingCapacity: Self._isDisposed = true // This variable is used to record whether the garbage bag array has been emptied return Disposables} deinit {// When DisposeBag's own object is disposed, Call your own dispose() to iterate over all the saved destroyers in the dispose array, self.dispose()}}Copy the code
- The source flow above is identified by a diagram
- To summarize the DisposeBag process above:
- When we call the sequence of
dispose()
Method is,DisposeBag
callinsert()
Method saves the sequence we need to destroy for storage_disposables
In the array. - When our DisposeBag is destroyed, such as the local variable defined out of scope, it will be destroyed. Our Deinit () method is called as shown in figure 4, and its dispose() method is executed in deinit(), and then all the previously saved variables that need to be released
_disposables
Array, in turn calling their own Dispose () method.
2. FetchOr () function
- FetchOr (); fetchOr (); fetchOr ();
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
this.lock()
let oldValue = this.value
this.value |= mask
this.unlock()
return oldValue
}
Copy the code
The source code is very simple, but the effect is not small. In the code this is the AtomicInt value passed in, with only one value inside. FetchOr returns a copy of this.value as the result. And enclosing the value and the mask do or (|) operation. And assigns the result of the or operation to this.value.
- To understand the result of this function, we use a table:
Is made a or operations, the actual decimal result remains the same, just change the inside of the binary, can be used to make marks, just inside the C language often use method, namely the value of an Int type processing itself can use, can also through the bitwise and, or, to change its logo, achieve the purpose of passing values, This allows each bit to replace a bool, often used as an enumeration.
- Through the analysis of the above, I learned that fetchOr () function in the role of, can make sure that every piece of code is executed only once, is equivalent to a flag, if the initial value of 0, 1 if the incoming parameters, assuming that this code is executed repeatedly five times, only the first will be from 0 to 1, the back four times call for 1, don’t send changes.
Dispose core logic
Dispose instance code analysis
- Children who have learned RxSwift know that dispose() will release our resources just like the reference counter in our OC. We can also listen for destroyed event callbacks at release time. Have you ever thought about how Dispose did it?
To know the answer, we can only analyze the source code step by step:
- First, let’s look at an example code:
Example 1:
Func limitObservable(){let ob = Observable<Any>. Create {(observer) -> Disposable OnNext ("kongyulu") return Disposables. Create {print(" dispose ")} // dispose. Dispose ()} // sequence subscribe let dispose = Ob. subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}, onError: {(error) in print(" subscribed :\(error)")}, onCompleted: {print (" finished ")} {print (" destruction of callback ")} print (" finished "). / / the dispose the dispose ()}Copy the code
- The above code executes as follows:
- From the above results, we know that the sequence created is not destroyed, that is, “destroyed free” is not printed, nor “Destroy callback” is printed. Why is that? This problem we later through the analysis of the source code Rx source will know.
- Now let’s uncomment that line of code above
dispose.dispose()
This line of code, uncommented and rerun, produces the following output:
- As we can see from the above code, the sequence created is destroyed and the destruction callback is executed. Then why did you add it
dispose.dispose()
Is that enough? - In addition, let’s modify our code again:
Example 2:
Func limitObservable(){let ob = Observable<Any>. Create {(observer) -> Disposable OnNext ("kongyulu") observer.onCompleted() return Disposables. Create {print(" dispose released ")} // dispose Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe :\(anything)")}, onError: dispose = ob.subscribe(onNext: {(anything) in print(" subscribe :\(anything)")}, onError: {(error) in print(" subscribed :\(error)")}, onCompleted: {print (" finished ")} {print (" destruction of callback ")} print (" finished "). / / the dispose the dispose ()}Copy the code
Example 2: Observer.oncompleted () : Observer.oncompleted ()
Here we can see that after we add one more line observer.onCompleted(), the destruction callback is also printed. What logic is this? why?
- Let’s explore how RxSwift is implemented with this question in mind
Dispose process source code parsing
Before analyzing Dispose source code, we must first deeply understand the creation of sequence, subscription process is the basis, only by understanding this, can we truly understand the principle of Dispose. This is actually covered in a previous article, but for more details, see my previous article: Sequence Core Logic
In order to better understand, I will clarify the specific process again:
1. Sequence creation and subscription process
- (1) When we execute the code
Let ob = Observable<Any>. Create {(observer) -> Disposable in this is A closure we call closure A}
“, you actually come to line 20 of the create. swift file:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
Copy the code
- (2) The create() function returns one
AnonymousObservable(subscribe)
And pass our closure A into the constructor of AnonymousObservable, which saves the closure Alet _subscribeHandler: SubscribeHandler
It’s stored in a variable._subscribeHandler
A variable that holds closure A passed in when the sequence OB was created (where closure A requires passing in)AnyObserver
Type as parameter)
final private class AnonymousObservable<Element>: Producer<Element> {TypeAlias SubscribeHandler = (AnyObserver<Element>) -> Disposable // This variable holds closure A passed in when the sequence ob was created (where closure A requires the AnyObserver type to be passed in as A parameter) @escaping SubscribeHandler) {self._subscribeHandler = SubscribeHandler // This variable holds the closure A that was passed in when the sequence OB was created... The following code will not be omitted}Copy the code
- (3) We call
Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)"}
Line 39 of the ObservableType+Extensions. Swift file is ObservableType+Extensions.
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable ... Let Observer = AnonymousObserver<Element> {return Disposables. Create ( self.asObservable().subscribe(observer), disposable ) }Copy the code
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable ... Let Observer = AnonymousObserver<Element> {return Disposables. Create ( self.asObservable().subscribe(observer), disposable ) }Copy the code
- (4) From above
subscribe()
As you can see from the source code, you create an AnonymousObserver object in the function, and then directly return Disposables. Create (). - (5) Here we don’t see any relationship between subscriptions and our closure A, and that’s the point
self.asObservable().subscribe(observer)
So this line of code, let me analyze what this line of code actually does. - (6) To understand this line of code in (5), we need to understand the class integration relationship: \
AnonymousObservable – > Producer – > observables – > ObservableType – > ObservableConvertibleType details below:
- (7) Through the inheritance relationship, we can follow the inheritance chain to find the parent class, we can find is in
Observable
Class defines thisAsObservable ()
Methods:
public class Observable<Element> : ObservableType { ... Public func asObservable() -> Observable<Element> {return self}... Omitted code that is not concerned}Copy the code
- (8) Through source code analysis, I know that asObservable() returns self, and (3) calls yes
self.asObservable().subscribe(observer)
Self in this line of code is the sequence ob that we created, soself.asObservable()
That’s the ob observable sequence that we created at the beginning.self.asObservable().subscribe(observer)
The observer in is where we arepublic func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
Local variables created in method implementation:Let observer = AnonymousObserver<Element>
We pass this local variable to the ObservableThe subscribe ()
Methods. - (9) Next we share Observable’s
The subscribe ()
Method does something. - (10) When we call this line in Example 2:
Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}
When the actual call is madeObservableType
Subscribe () method, in which we create oneAnonymousObserver
Elephant, and throughself.asObservable().subscribe(observer)
Ob. Susbscribe (Observer) is passed (note: Ob is the AnonymousObservable object created by create(), and observer is a temporary local AnonymousObserver created by subscribe, which has been analyzed above. - (11) However, we can see from the above class diagram that there is no subscribe() method in ob (AnonymousObservable), so we can only look for its parent Producer first.
- (12) According to the above class diagram analysis, we can see that Producer inherits the Observerable observable sequence and follows the ObservableType protocol (which defines a SUBSCRIBE () interface), so we must implement this interface in Producer. Let me look at the source code:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { if ! CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was Disposed. Let Disposer = SinkDisposer() // The following line of code is the key, calling its own run() method and passing in two parameters: This is the 'AnonymousObserver' object that we pass to 'selp.asObservable ().subscribe(observer)' // parameter 2: Disposer: SinkDisposer() will be taken care of later. let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } }Copy the code
- (13) Through the above source code analysis, we know that
Producer
Now theThe subscribe ()
Inside the mouth, called its ownrun()
Method, and throughout the yearrun()
The method passed observer: that is usself.asObservable().subscribe(observer)
The incomingAnonymousObserver
Elephant. So let’s see what run does:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
Copy the code
- (14) From the run() method in Producer above, we can see that the method is not doing anything, just one line
rxAbstractMethod()
This rxAbstractMethod() is just an abstract method. Our subclass AnonymousObservable must have overridden the run() method. So let’s seeAnonymousObservable
therun()
Source:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where observer. Element == Element {// Create a tube AnonymousObservableSink and pass the tube two parameters: This is the 'AnonymousObserver' object that we pass to 'selp.asObservable ().subscribe(observer)' // parameter 2: Disposer: SinkDisposer() will be taken care of later. let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) }Copy the code
-
In the run() method of AnonymousObservable, the run() method of AnonymousObservable. First, create an AnonymousObservableSink object sink and pass in an observer (the AnonymousObserver we pass in self.asObservable().subscribe(observer). Second, the sink. Run (self) method is called to return the subscription, followed directly by a tuple that the run() method returns: (sink: sink, subscription: subscription). But our focus is on the sink tube. AnonymousObservableSink is a manager-like role that holds sequence, subscriber, and destroyer information, as well as scheduling capabilities. It is through this tube that our series and subscribers communicate.
-
(16) What does the AnonymousObservableSink pipe do next? AnonymousObservableSink
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {typeAlias Element = observer. Element Typealias Parent = AnonymousObservable<Element> // state private let _isStopped = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif override init(observer: Observer, cancel: Subscribe (observer) {super.init(observer) {// The AnonymousObserver is called by 'self.asObservable().subscribe(observer)'. observer, cancel: cancel) } func on(_ event: Event<Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: If load(self._isstopped) == 1 {// If.error,.completed is executed, the self.forwardon (event) code will not continue. The forwardOn is not executed unless the.complete,.error event is executed during the object's life cycle. return } self.forwardOn(event) case .error, .completed: If fetchOr(self._isstopped, 1) == 0 {// fetchOr(self._isstopped, 1) == 0 {// fetchOr(self._isstopped, 1) == 0 {// fetchOr(self._isstopped, 1) == 0 { To ensure that the following code executes only once in the object's life cycle, no matter how many times on() is called. Self.parent () self.parent () self.parent () self.dispose() Parent) -> Disposable {// Parent is called AnonymousObservable, So we start with the create() sequence OB,_subscribeHandler which is the closure A that we pass in when we create the sequence. (Closure A is A function that requires A parameter, This argument is AnyObserver(self)) return parent._subscribeHandler(AnyObserver(self))}}Copy the code
- (17) Through the source code of AnonymousObservableSink above, we know the following conclusions:
- AnonymousObservableSink. Init initialization time introduced to the observer: is us
self.asObservable().subscribe(observer)
The incomingAnonymousObserver
Object. - AnonymousObservableSink has an on() method that does different things with the event parameter passed in, but is called at least once
self.forwardOn(event)
Methods. Each time if onNext event is called onceforwardOn()
. However, the. Error,. Completed event is called at most onceforwardOn()
. - AnonymousObservableSink’s run() method is the core method that calls back to the closure A we passed when we first created create() and creates ob.subscribe() internally
AnonymousObserver
Objects sink through our AnonymousObservableSink objects, that isAnyObserver(self)
In theself
Once wrapped as an AnyObserver structure, we pass closure A as A parameter, thus linking our sequence to the subscribers. - Pay special attention to: Many people think that passing our closure A is
AnonymousObserver
In fact, it is not; closure A is passed as an AnyObserver structure - With the run() method of AnonymousObservableSink we successfully passed the closure we created with our original ob.subscibe() subscription
AnyObserver(self)
When we call this line of code inside closure A:observer.onNext("kongyulu")
After ob.subscribe(),AnyObserver(self)
This is our observer, and the observer is a structure that owns our tubeAnonymousObservableSink
Object’s on() method. - In example 1: when we send
observer.onNext("kongyulu")
When you sequence messages, they actually pass through our pipeAnonymousObservableSink.on()
To schedule, and finally schedule the closure we subscribe to: onNext()Closure B:Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :(anything)")}
. - So the big question now is: AnonymousObservableSink) on () is how to from the observer. The onNext (” kongyulu “) scheduling to our closure B?
- To analyze the above problem, we need to analyze the structure first
AnyObserver(self)
What you did: First look at AnyObsevrer’s source code
public struct AnyObserver<Element> : ObserverType { /// Anonymous event handler type. public typealias EventHandler = (Event<Element>) -> Void // We define the alias EventHandler as a closure for an incoming event. EventHandler /// Construct an instance whose `on(event)` calls `eventHandler(event)` /// /// - parameter eventHandler: Event handler that observes sequences events. public init(eventHandler: @escaping EventHandler) {// Self. Observer saved the AnonymousObservableSink object self. Observer = EventHandler} /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. Public init<Observer: ObserverType>(_ Observer: self) {AnyObserver(self); Element. The Observer) where the Observer Element = = {/ / this code directly save the AnonymousObservableSink. / / self on () method. The actual is an Observer on () method self.observer = observer.on } /// Send `event` to this observer. /// /// - parameter event: Event instance. public func on(_ event: Event Element < >) {/ / here call on AnonymousObservableSink. The practical approach is to call on (Event) method to return the self. The observer (Event)} / / / Erases the type of observer and returns canonical observer. /// /// - returns: type erased observer. public func asObserver() -> AnyObserver<Element> { return self } }Copy the code
- (19) Through the above AnyObserver source analysis, we know that
AnyObserver
The initialization saves our tubeAnonymousObservableSink
theon()
Methods, and they have a on method, in his own on methods to call AnonymousObservableSink. On () method. It’s just a cover so the world doesn’t know about usAnonymousObservableSink
Class, why is that? This design has several advantages:
- It’s completely encapsulated, so the outside world doesn’t need to know about our tubes
AnonymousObservableSink
Class, they don’t care about usAnonymousObservableSink
The user only needs to use the interface on(). It doesn’t matter how on() is implemented or by whom. - Acts as a decoupling effect,
AnyObserver
Doesn’t own usAnonymousObservableSink
Object, it just owns itAnonymousObservableSink
The on() interface of theAnonymousObservableSink
That’s all you need to do to implement the on() interface. As for theAnonymousObservableSink
Internal changes (as long as the ON () interface is not changed) will not affectAnyObserver
.
-
(20) Now we focus on the on() method:
-
- When weExample 1To perform:
observer.onNext("kongyulu")
This line of code actually calls:AnyObserver.onNext()
Methods. Since we AnyObserver inherit the ObserverType protocol, we have itObserverType
theOnNext ()
Method, you can go back to class inheritance if you don’t know.
- When weExample 1To perform:
-
Anyobserver.onnext () calls its own on() method: anyobserver.onnext ()
Interface definition for ObserverType
extension ObserverType { public func onNext(_ element: Element) {self.on(.next(Element))// This will revert to AnyObserver's on() method, AnyObserver inherits ObserverType, Public func onCompleted() {self.on(.completed)} public func onError(_ error: Swift.Error) { self.on(.error(error)) } }Copy the code
- (22)
AnyObserver.on()
Method will callAnonymousObservableSink.on()
Methods. - (23)
AnonymousObservableSink.on(event)
Will be calledAnonymousObservableSink.forwardOn(event)
- (24) It is not defined in AnonymousObservableSink
ForwardOn ()
Method, which we find implemented in its parent class SinkForwardOn ()
The source code is as follows:
class Sink<Observer: ObserverType> : Disposable { fileprivate let _observer: Observer fileprivate let _cancel: Cancelable fileprivate let _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: Observer, cancel: Cancelable) {#if TRACE_RESOURCES _ = resources.incrementtotal () #endif We 'self.asObservable().subscribe(observer)' pass the 'AnonymousObserver' object self._observer = observer self._cancel = cancel} final func forwardOn(_ event: Event<Observer.Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) {return} // The 'anonymousobServer.on ()' method is actually called here. self._observer.on(event) } ... This time the code omitted, do not need to worry about}Copy the code
- (25) From the above source we can see:
Sink. ForwardOn ()
Actually calledAnonymousObserver.on()
To put it bluntly: we started with instance 1observer.onNext("kongyulu")
Ob.onnext () is called first when this line of code executesAnyObserver.on()
.AnyObserver.on()
Will call againAnonymousObservableSink.on()
.AnonymousObservableSink.on()
Will call againAnonymousObservableSink.forwardOn()
And thenAnonymousObservableSink.forwardOn()
The AnonymousObservableSink parent will be called againSink. ForwardOn ()
And finally theSink. ForwardOn ()
Call theAnonymousObserver.on()
. - (26) Now that we’re pretty clear, let’s go back to
AnonymousObserver.on()
Method definition:
- First we looked at the class definition as follows, and did not find it by the on() method:
final class AnonymousObserver<Element>: ObserverBase<Element> {typeAlias EventHandler = (Event<Element>) -> Void private let _eventHandler: EventHandler init(_ eventHandler: @escaping EventHandler) {#if TRACE_RESOURCES _ = resources.incrementTotal () #endif // Here an incoming trailing closure is saved: Subscribe () let Observer = AnonymousObserver<Element> {event in here is a tag closure B} we pass _eventHandler to save tag closure B self._eventHandler = eventHandler } override func onCore(_ event: <Element>) {return self._eventhandler (Event)} #if TRACE_RESOURCES deinit {_ = Resources.decrementTotal() } #endif }Copy the code
- So let’s look for its parent class ObserverBase:
class ObserverBase<Element> : Disposable, ObserverType { private let _isStopped = AtomicInt(0) func on(_ event: Event<Element>) { switch event { case .next: If load(self._isstopped) == 0 {self.oncore (event)} case.error,.completed: if fetchOr(self._isStopped, 1) == 0 { self.onCore(event) } } } func onCore(_ event: Event<Element>) { rxAbstractMethod() } func dispose() { fetchOr(self._isStopped, 1) } }Copy the code
- We know by analyzing the parent source code
ObserverBase.on()
And finally calledAnonymousObserver.onCore()
Anonymousobserver.oncore () calls back the _eventHandler(event) closure B, which is the trailing closure that created AnonymousObserver when we subscribed to the ob.subscribe() sequence. So the trailing closure ends up calling the onNext() method to which we subscribed. In example 1, executeobserver.onNext("kongyulu")
This line of code is going to call backDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}
So it prints“Subscribe to: Kongyulu”
The code for the trailing closure B of AnonymousObserver {B} is as follows:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ... // Note that: AnonymousObserver<Element> {B} the trailing closure inside the parentheses is called B, Closure B let Observer = AnonymousObserver<Element> {event in... Switch event {case.next (let value): onNext? (value) // Call onNext(value) case.error (let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) }Copy the code
-
(27) Through the analysis of point (26), we should understand the whole subscription process, which can be summarized as follows:
- Our ob. Create (
The closure of A
Save closure A inAnonymousObservable
In the variable_subscribeHandler
. - When we call ob.subscribe(
Closure B
When subscribing to a sequence, one is created firstAnonymousObserver
Object, and will take a trailingClosure C
. Then throughself.asObservable().subscribe(AnonymousObserver)
Through a series of transformationsAnyObserver
Passed to theThe closure of A
. - In 2 of them, a series of transformations can be simply explained as:
self.asObservable().subscribe(AnonymousObserver)
The actual isob.subscribe(AnonymousObserver)
ob.subscribe(AnonymousObserver)
The actual isProducer.subscribe(AnonymousObserver)
Producer.subscribe(AnonymousObserver)
Will be calledself.run(AnonymousObserver)
self.run(AnonymousObserver)
Will create aAnonymousObservableSink
Tube object sink and then callsink.run(AnonymousObservable)
The tube’s run() method is called and OB is passed to the tube sink.- And we tube
sink.run(AnonymousObservable)
Method is calledparent._subscribeHandler(AnyObserver(self))
The actual isob._subscribeHandler(AnyObserver(AnonymousObservableSink))
That’s calledThe closure of A
- And we
The closure of A
You need to pass in an argumentAnyObserver(AnonymousObservableSink)
In fact,AnyObserver
It’s just a structure. It’s preservedAnonymousObservableSink.on()
Methods. - When we are
The closure of A
It callsobserver.onNext("kongyulu")
In factAnyObserver.onNext("kongyulu")
And theAnyObserver.onNext("kongyulu")
Will be calledAnyObserver.on()
AnyObserver.on()
And then callAnonymousObservableSink.on(event)
So in this event- AnonymousObservableSink class
AnonymousObservableSink.on(event)
And then it calls its ownforwardOn(event)
That isAnonymousObservableSink.forwardOn(event)
AnonymousObservableSink.forwardOn(event)
It’s actually calling its parentSink.forwardOn(event)
The Sink parent class is already saved when initializedAnonymousObserver
_observer object.Sink.forwardOn(event)
What’s going to be called isAnonymousObserver.on(event)
AnonymousObserver.on(event)
It actually calls its parent classObserverBase.on(event)
ObserverBase.on(event)
It actually calls the subclass againAnonymousObserver.onCore(event)
AnonymousObserver.onCore(event)
Will be calledself._eventHandler(event)
In this case, _eventHandler saves the trailing passed in when AnonymousObserver was createdClosure C
So that’s the callbackClosure C
Closure C
Is called back according to the eventClosure B
For example, the event=.onNext event will be called backClosure B
OnNext {}, i.eDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}, onError: dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")} {(error) in print (" subscription to: \ (error) ")}, onCompleted: {print (" finished ")}) {print (" destruction of callback ")}
Here’s the code:OnNext: {(anything) in print(" subscribed :\(anything)")
So it prints: “Subscribed to: Kongyulu.”
- Our ob. Create (
Finally, there is a flow chart to illustrate the whole creation and subscription process
2. Sequence creation, subscription diagram
As explained above, in order to solve the problem of sequence, we began to analyze Dispose (), which seems to have code everywhere in the whole source code, so how is the sequence destroyed?
To solve this problem, we will explore the sequence destruction process by analyzing the source code.
Here is a sequence life cycle sequence diagram:
Through this sequence diagram, combined with the sequence creation and subscription process analysis above, I can first find three ways that the sequence will be destroyed:
- Method 1: Release sequence resources by sending events that automatically end the sequence life cycle. Once an error or completed event is emitted, the life cycle of a sequence ends and all internal resources are released without manual release. (This conclusion was verified in this blog when we discussed instances 1 and 2. OnComplete is called and a “destroyed” message is printed whenever completed and error events are sent.)
- Method 2: Dispose () is called to release. For example, if you need to pre-dispose a sequence resource or unsubscribe, you can call dispose on the returned Disposable.
- Method 3: Recycle resources through garbage bag DisposeBag to achieve automatic release, which is officially recommended. The official recommendation for managing the life cycle of a subscription is to add resources to a global DisposeBag that follows the life cycle of the page, and when the page is destroyed, the DisposeBag is also destroyed and the resources in the DisposeBag are released. (This conclusion is also confirmed in the DisposeBag analysis above)
3. Sequence destruction case analysis
Let’s start by reviewing example 1, the code for example 1 that this blog began analyzing:
Func limitObservable(){let ob = Observable<Any>. Create {(observer) -> Disposable OnNext ("kongyulu") return Disposables. Create {print(" dispose ")} // dispose. Dispose ()} // sequence subscribe let dispose = Ob. subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}, onError: {(error) in print(" subscribed :\(error)")}, onCompleted: {print (" finished ")} {print (" destruction of callback ")} print (" finished "). / / the dispose the dispose ()}Copy the code
- The above code executes as follows:
- From the above results, we know that the sequence created is not destroyed, that is, “destroyed free” is not printed, nor “Destroy callback” is printed. Why is that? This problem we later through the analysis of the source code Rx source will know.
- Now let’s uncomment that line of code above
dispose.dispose()
This line of code, uncommented and rerun, produces the following output:
3.1 Sequence destruction source code analysis
- From the code in example 1 above, you can first see that the sequence is being created
Observable<Any>.create()
Method has a trailing closure and needs to return an implementationDisposable
An instance of a protocol. And that is throughReturn Disposables. Create {print(" Disposables ")}
This line of code returns. From this we confirmDisposables. Create {print(" Destroy release ")}
It’s very important. Let’s analyze it firstDisposables.create
The source code. - Enter Disposables. Create () source: We want to just click On It and find Disposables is an empty structure
public struct Disposables {
private init() {}
}
Copy the code
Since this structure is private even the initialization method cannot be inherited, we infer that Disposables. Create () must be implemented by extension. So we’ll search for extension Disposables in the project, and you’ll find the following:
So we find the first: AnonymousDisposable. Swift file into the line find 55:
extension Disposables { /// Constructs a new disposable with the given action used for disposal. /// /// - parameter dispose: Disposal action which will be run upon calling `dispose`. public static func create(with dispose: @escaping () -> Void) -> Cancelable {return AnonymousDisposable(disposeAction: dispose)Copy the code
Return AnonymousDisposable(disposeAction: Dispose dispose() closes, which is Disposables in example 1. Create {print(” dispose released “)} // dispose. Dispose ()} {print(” destroy free “)} here we give it an alias: closure D
- Don’t think about it. We’re definitely going in
AnonymousDisposable
Class implementation to explore:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
public typealias DisposeAction = () -> Void
private let _isDisposed = AtomicInt(0)
private var _disposeAction: DisposeAction?
/// - returns: Was resource disposed.
public var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
fileprivate init(_ disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
// Non-deprecated version of the constructor, used by `Disposables.create(with:)`
fileprivate init(disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
if let action = self._disposeAction {
self._disposeAction = nil
action()
}
}
}
}
Copy the code
- Analyzing the above AnonymousDisposable class definition source code, we can draw the following conclusions:
- Initialization saves the closure passed in from the outside world, which is what we analyzed in point 2Closure D:
{print(" destroy free ")}
- There is a
dispose()
By means offetchOr(self._isDisposed, 1) == 0
This line of code controlsdispose()
The contents are executed only once. (no matterdispose()
How many times the method is executed,if let action = self._disposeAction { self._disposeAction = nil action() }
This code will be executed at most once. dispose()
I’m going to do it firstself._disposeAction
Assign a value to a temporary variableaction
, and then emptyself._disposeAction
And then to performaction()
. The reason for doing this is if_disposeAction
Closures are a time-consuming operation and can be guaranteed_disposeAction
Capable of immediate release.
-
AnonymousDisposable we only saw some routine save operations, combined with our experience in the creation process of analyzing sequences at the beginning (AnonymousDisposable is similar to AnonymousObservable), We can infer that the core code implementation must be in the subscription area.
-
Next, we’ll dive into the observable.subscribe() method to explore some of the subscribe() source code implementation.
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? Disposable = Disposable (Disposable) -> Disposable {//1. Disposables if let disposed = ondisposables {disposable = Disposables. Create (with: disposed) } else { disposable = Disposables.create() } //3. Creates an AnonymousObserver object, There is an important trailing closure let Observer = AnonymousObserver<Element> {event in switch event {case.next (let value): onNext? (value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, Dispose (); // Dispose (); // Dispose () Disposables onCompleted?() disposable.dispose() // This event will dispose of the Disposables when completed is received}} return Disposables. Create ( Self.asobservable ().subscribe(observer), disposable// here we pass our local variable to self.asObservable().subscribe, That's our producer.subscribe)}Copy the code
Analysis of the above subscribe () source, combined with the beginning of the analysis, we can draw the following conclusions:
The subscribe ()
Created aDisposable
Object and holds the destruction callback closure, which calls the message back out when the destruction is performed.- Executed when an error or completion event is received
disposable.dispose()
Release resources. return Disposables.create( self.asObservable().subscribe(observer), disposable )
That’s returned hereDisposable
Object is what we call outside manuallydispose.dispose()
methodsdispose
Object, or added to the globalDisposeBag
The destroyers of.
- From the analysis of 6, we clearly know the last line of code
return Disposables.create( self.asObservable().subscribe(observer), disposable )
Key points, let’s enter:Disposables. The create ()
Source:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {return BinaryDisposable(Disposable1, Disposable2)// Returns a binary disposer object. }Copy the code
So in the code above we see that Create () directly returns a Binary disposableDisposable1, Disposable2 to BinaryDisposable.
- Here,
disposable1
isself.asObservable().subscribe(observer)
That isProducer.. subscribe(observer)
Returns the disposer disposable2
We subscribe() to create a local variablelet disposable: Disposable
\
Since the | link