Introduction of Rxswift destroyer Dispose

  1. First of all, please use a mind map to get a preliminary understanding of what Dispose owned and did:

  1. This paper mainly focuses on the above picture, focusing on the analysis of how Dispose() destroys the sequence.
  2. From the figure above we can see that dispose and disposeBag are the first root nodes after the destroyer. What are they? The answers are explained below.

Introduction to Rxswift destroyer classes and important functions

1. DisposeBag

1.1 What is a DisposeBag

RxSwift and RxCocoa also have an additional tool to assist with ARC and memory management: DisposeBag. This is a virtual “package” of observers that is discarded when their parent object is released. When an object with the DisposeBag attribute calls deinit(), the virtual package is emptied and each Disposable Observer automatically unsubscribs to what it observes. This allows ARC to reclaim memory as usual. If there is no DisposeBag, there are two outcomes: either the Observer generates a retain cycle that is bound to the observed object indefinitely; Or accidentally released, causing the program to crash. So to be a good ARC person, set up Observables and add them to DisposeBag. In this way, they can be cleaned well.

When an Observable is observed and subscribed, it generates a Disposable instance that can be used to free resources. There are two ways to release resources in RxSwift, namely unbind and free space, respectively explicit and implicit:

  • Explicit release allows us to call the release method directly in our code to release resources as shown in example 1:
let dispose = textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
dispose.dispose()
Copy the code
  • Implicit release Implicit release is done through DisposeBag, which is similar to the automatic release pool mechanism in Objective-C ARC. When an instance is created, it is added to the automatic release pool of the thread in which it is created. The automatic release pool releases and rebuilds the pool after a RunLoop cycle. The DisposeBag is like an automatic release pool for RxSwift. We add resources to the DisposeBag and release them along with the DisposeBag.

Example 2:

let disposeBag = DisposeBag()
func binding() {
      textField.rx_text
          .bindTo(label.rx_sayHelloObserver)
          .addDisposableTo(self.disposeBag)
}
Copy the code

In this code, the method addDisposableTo makes a weak reference to DisposeBag, so this DisposeBag will be referenced by the instance, usually as a member variable of the instance, and when the instance is destroyed, the member DisposeBag will follow. This frees resources bound by RxSwift on this instance.

From the above we can see that DisposeBag is like our automatic release pool in OC memory management. It acts as a garbage collection bag, you just add the sequence to the disposeBag, and the disposeBag will help us release resources at the right time, so how does it do that?

1.2 DisposeBag implementation source code analysis

1.2.1. Take a look at the class diagram:

1.2.2. Specifically analyze the source code process

  1. When we invoke the Disposed () method, we call the Insert () method of the Dispose class to add the disposables to the _Disposables array. The specific source code is as follows:
public final class DisposeBag: DisposeBase { private var _lock = SpinLock() // state fileprivate var _disposables = [Disposable]() fileprivate var _isDisposed = false /// Constructs new empty dispose bag. public override init() { super.init() } /// Adds `disposable` to be disposed when dispose bag is being deinited. /// /// - parameter disposable: Disposable to add. public func insert(_ disposable: Disposable) { self._insert(disposable)? .dispose() } private func _insert(_ disposable: Disposable) -> Disposable? Self._lock. lock(); self._lock.lock(); Defer {self._lock.unlock()} if self._isdisposed {// If _dispose() is disposed, it means that it has been disposed and no longer needs to be disposed to ensure symmetry. Disposables = disposable disposables = disposable disposables = disposable disposables = disposdisposables = disposable disposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables = disposdisposables take a look at `CompositeDisposable` instead. private func dispose() { // 1. Take all saved disposables = self. _disposables () // 2. Going through every destroyer, Dispose of disposables {disposable. Dispose ()}} private func _dispose() -> dispose of disposables {disposables. Dispose ()} private func _dispose() -> [Disposable] { self._lock.lock(); Defer {self._lock.unlock()} // Take all the saved disposables = self._Disposables self._disposables.removeAll(keepingCapacity: Self._isDisposed = true // This variable is used to record whether the garbage bag array has been emptied return Disposables} deinit {// When DisposeBag's own object is disposed, Call your own dispose() to iterate over all the saved destroyers in the dispose array, self.dispose()}}Copy the code
  1. The source flow above is identified by a diagram

  1. To summarize the DisposeBag process above:
  • When we call the sequence ofdispose()Method is,DisposeBagcallinsert()Method saves the sequence we need to destroy for storage_disposablesIn the array.
  • When our DisposeBag is destroyed, such as the local variable defined out of scope, it will be destroyed. Our Deinit () method is called as shown in figure 4, and its dispose() method is executed in deinit(), and then all the previously saved variables that need to be released_disposablesArray, in turn calling their own Dispose () method.

2. FetchOr () function

  1. FetchOr (); fetchOr (); fetchOr ();
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}
Copy the code

The source code is very simple, but the effect is not small. In the code this is the AtomicInt value passed in, with only one value inside. FetchOr returns a copy of this.value as the result. And enclosing the value and the mask do or (|) operation. And assigns the result of the or operation to this.value.

  1. To understand the result of this function, we use a table:

Is made a or operations, the actual decimal result remains the same, just change the inside of the binary, can be used to make marks, just inside the C language often use method, namely the value of an Int type processing itself can use, can also through the bitwise and, or, to change its logo, achieve the purpose of passing values, This allows each bit to replace a bool, often used as an enumeration.

  1. Through the analysis of the above, I learned that fetchOr () function in the role of, can make sure that every piece of code is executed only once, is equivalent to a flag, if the initial value of 0, 1 if the incoming parameters, assuming that this code is executed repeatedly five times, only the first will be from 0 to 1, the back four times call for 1, don’t send changes.

Dispose core logic

Dispose instance code analysis

  • Children who have learned RxSwift know that dispose() will release our resources just like the reference counter in our OC. We can also listen for destroyed event callbacks at release time. Have you ever thought about how Dispose did it?

To know the answer, we can only analyze the source code step by step:

  • First, let’s look at an example code:

Example 1:

Func limitObservable(){let ob = Observable<Any>. Create {(observer) -> Disposable OnNext ("kongyulu") return Disposables. Create {print(" dispose ")} // dispose. Dispose ()} // sequence subscribe let dispose = Ob. subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}, onError: {(error) in print(" subscribed :\(error)")}, onCompleted: {print (" finished ")} {print (" destruction of callback ")} print (" finished "). / / the dispose the dispose ()}Copy the code
  1. The above code executes as follows:

  1. From the above results, we know that the sequence created is not destroyed, that is, “destroyed free” is not printed, nor “Destroy callback” is printed. Why is that? This problem we later through the analysis of the source code Rx source will know.
  2. Now let’s uncomment that line of code abovedispose.dispose()This line of code, uncommented and rerun, produces the following output:

  1. As we can see from the above code, the sequence created is destroyed and the destruction callback is executed. Then why did you add itdispose.dispose()Is that enough?
  2. In addition, let’s modify our code again:

Example 2:

Func limitObservable(){let ob = Observable<Any>. Create {(observer) -> Disposable OnNext ("kongyulu") observer.onCompleted() return Disposables. Create {print(" dispose released ")} // dispose Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe :\(anything)")}, onError: dispose = ob.subscribe(onNext: {(anything) in print(" subscribe :\(anything)")}, onError: {(error) in print(" subscribed :\(error)")}, onCompleted: {print (" finished ")} {print (" destruction of callback ")} print (" finished "). / / the dispose the dispose ()}Copy the code

Example 2: Observer.oncompleted () : Observer.oncompleted ()

Here we can see that after we add one more line observer.onCompleted(), the destruction callback is also printed. What logic is this? why?

  • Let’s explore how RxSwift is implemented with this question in mind

Dispose process source code parsing

Before analyzing Dispose source code, we must first deeply understand the creation of sequence, subscription process is the basis, only by understanding this, can we truly understand the principle of Dispose. This is actually covered in a previous article, but for more details, see my previous article: Sequence Core Logic

In order to better understand, I will clarify the specific process again:

1. Sequence creation and subscription process

  • (1) When we execute the codeLet ob = Observable<Any>. Create {(observer) -> Disposable in this is A closure we call closure A}“, you actually come to line 20 of the create. swift file:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
Copy the code
  • (2) The create() function returns oneAnonymousObservable(subscribe)And pass our closure A into the constructor of AnonymousObservable, which saves the closure Alet _subscribeHandler: SubscribeHandlerIt’s stored in a variable._subscribeHandlerA variable that holds closure A passed in when the sequence OB was created (where closure A requires passing in)AnyObserverType as parameter)
final private class AnonymousObservable<Element>: Producer<Element> {TypeAlias SubscribeHandler = (AnyObserver<Element>) -> Disposable // This variable holds closure A passed in when the sequence ob was created (where closure A requires the AnyObserver type to be passed in as A parameter) @escaping SubscribeHandler) {self._subscribeHandler = SubscribeHandler // This variable holds the closure A that was passed in when the sequence OB was created... The following code will not be omitted}Copy the code
  • (3) We callDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)"}Line 39 of the ObservableType+Extensions. Swift file is ObservableType+Extensions.
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable ... Let Observer = AnonymousObserver<Element> {return Disposables. Create ( self.asObservable().subscribe(observer), disposable ) }Copy the code
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable ... Let Observer = AnonymousObserver<Element> {return Disposables. Create ( self.asObservable().subscribe(observer), disposable ) }Copy the code
  • (4) From abovesubscribe()As you can see from the source code, you create an AnonymousObserver object in the function, and then directly return Disposables. Create ().
  • (5) Here we don’t see any relationship between subscriptions and our closure A, and that’s the pointself.asObservable().subscribe(observer)So this line of code, let me analyze what this line of code actually does.
  • (6) To understand this line of code in (5), we need to understand the class integration relationship: \

AnonymousObservable – > Producer – > observables – > ObservableType – > ObservableConvertibleType details below:

  • (7) Through the inheritance relationship, we can follow the inheritance chain to find the parent class, we can find is inObservableClass defines thisAsObservable ()Methods:
public class Observable<Element> : ObservableType { ... Public func asObservable() -> Observable<Element> {return self}... Omitted code that is not concerned}Copy the code
  • (8) Through source code analysis, I know that asObservable() returns self, and (3) calls yesself.asObservable().subscribe(observer)Self in this line of code is the sequence ob that we created, soself.asObservable()That’s the ob observable sequence that we created at the beginning.self.asObservable().subscribe(observer)The observer in is where we arepublic func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)Local variables created in method implementation:Let observer = AnonymousObserver<Element>We pass this local variable to the ObservableThe subscribe ()Methods.
  • (9) Next we share Observable’sThe subscribe ()Method does something.
  • (10) When we call this line in Example 2:Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}When the actual call is madeObservableTypeSubscribe () method, in which we create oneAnonymousObserverElephant, and throughself.asObservable().subscribe(observer)Ob. Susbscribe (Observer) is passed (note: Ob is the AnonymousObservable object created by create(), and observer is a temporary local AnonymousObserver created by subscribe, which has been analyzed above.
  • (11) However, we can see from the above class diagram that there is no subscribe() method in ob (AnonymousObservable), so we can only look for its parent Producer first.
  • (12) According to the above class diagram analysis, we can see that Producer inherits the Observerable observable sequence and follows the ObservableType protocol (which defines a SUBSCRIBE () interface), so we must implement this interface in Producer. Let me look at the source code:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { if ! CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was Disposed. Let Disposer = SinkDisposer() // The following line of code is the key, calling its own run() method and passing in two parameters: This is the 'AnonymousObserver' object that we pass to 'selp.asObservable ().subscribe(observer)' // parameter 2: Disposer: SinkDisposer() will be taken care of later. let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } }Copy the code
  • (13) Through the above source code analysis, we know thatProducerNow theThe subscribe ()Inside the mouth, called its ownrun()Method, and throughout the yearrun()The method passed observer: that is usself.asObservable().subscribe(observer)The incomingAnonymousObserverElephant. So let’s see what run does:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
Copy the code
  • (14) From the run() method in Producer above, we can see that the method is not doing anything, just one linerxAbstractMethod()This rxAbstractMethod() is just an abstract method. Our subclass AnonymousObservable must have overridden the run() method. So let’s seeAnonymousObservabletherun()Source:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where observer. Element == Element {// Create a tube AnonymousObservableSink and pass the tube two parameters: This is the 'AnonymousObserver' object that we pass to 'selp.asObservable ().subscribe(observer)' // parameter 2: Disposer: SinkDisposer() will be taken care of later. let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) }Copy the code
  • In the run() method of AnonymousObservable, the run() method of AnonymousObservable. First, create an AnonymousObservableSink object sink and pass in an observer (the AnonymousObserver we pass in self.asObservable().subscribe(observer). Second, the sink. Run (self) method is called to return the subscription, followed directly by a tuple that the run() method returns: (sink: sink, subscription: subscription). But our focus is on the sink tube. AnonymousObservableSink is a manager-like role that holds sequence, subscriber, and destroyer information, as well as scheduling capabilities. It is through this tube that our series and subscribers communicate.

  • (16) What does the AnonymousObservableSink pipe do next? AnonymousObservableSink

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {typeAlias Element = observer. Element Typealias Parent = AnonymousObservable<Element> // state private let _isStopped = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif override init(observer:  Observer, cancel: Subscribe (observer) {super.init(observer) {// The AnonymousObserver is called by 'self.asObservable().subscribe(observer)'. observer, cancel: cancel) } func on(_ event: Event<Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: If load(self._isstopped) == 1 {// If.error,.completed is executed, the self.forwardon (event) code will not continue. The forwardOn is not executed unless the.complete,.error event is executed during the object's life cycle. return } self.forwardOn(event) case .error, .completed: If fetchOr(self._isstopped, 1) == 0 {// fetchOr(self._isstopped, 1) == 0 {// fetchOr(self._isstopped, 1) == 0 {// fetchOr(self._isstopped, 1) == 0 { To ensure that the following code executes only once in the object's life cycle, no matter how many times on() is called. Self.parent () self.parent () self.parent () self.dispose() Parent) -> Disposable {// Parent is called AnonymousObservable, So we start with the create() sequence OB,_subscribeHandler which is the closure A that we pass in when we create the sequence. (Closure A is A function that requires A parameter, This argument is AnyObserver(self)) return parent._subscribeHandler(AnyObserver(self))}}Copy the code
  • (17) Through the source code of AnonymousObservableSink above, we know the following conclusions:
  1. AnonymousObservableSink. Init initialization time introduced to the observer: is usself.asObservable().subscribe(observer)The incomingAnonymousObserverObject.
  2. AnonymousObservableSink has an on() method that does different things with the event parameter passed in, but is called at least onceself.forwardOn(event)Methods. Each time if onNext event is called onceforwardOn(). However, the. Error,. Completed event is called at most onceforwardOn().
  3. AnonymousObservableSink’s run() method is the core method that calls back to the closure A we passed when we first created create() and creates ob.subscribe() internallyAnonymousObserverObjects sink through our AnonymousObservableSink objects, that isAnyObserver(self)In theselfOnce wrapped as an AnyObserver structure, we pass closure A as A parameter, thus linking our sequence to the subscribers.
  4. Pay special attention to: Many people think that passing our closure A isAnonymousObserverIn fact, it is not; closure A is passed as an AnyObserver structure
  5. With the run() method of AnonymousObservableSink we successfully passed the closure we created with our original ob.subscibe() subscriptionAnyObserver(self)When we call this line of code inside closure A:observer.onNext("kongyulu")After ob.subscribe(),AnyObserver(self)This is our observer, and the observer is a structure that owns our tubeAnonymousObservableSinkObject’s on() method.
  6. In example 1: when we sendobserver.onNext("kongyulu")When you sequence messages, they actually pass through our pipeAnonymousObservableSink.on()To schedule, and finally schedule the closure we subscribe to: onNext()Closure B:Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :(anything)")}.
  7. So the big question now is: AnonymousObservableSink) on () is how to from the observer. The onNext (” kongyulu “) scheduling to our closure B?
  • To analyze the above problem, we need to analyze the structure firstAnyObserver(self)What you did: First look at AnyObsevrer’s source code
public struct AnyObserver<Element> : ObserverType { /// Anonymous event handler type. public typealias EventHandler = (Event<Element>) -> Void // We define the alias EventHandler as a closure for an incoming event. EventHandler /// Construct an instance whose `on(event)` calls `eventHandler(event)` /// /// - parameter eventHandler: Event handler that observes sequences events. public init(eventHandler: @escaping EventHandler) {// Self. Observer saved the AnonymousObservableSink object self. Observer = EventHandler} /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. Public init<Observer: ObserverType>(_ Observer: self) {AnyObserver(self); Element. The Observer) where the Observer Element = = {/ / this code directly save the AnonymousObservableSink. / / self on () method. The actual is an Observer on () method self.observer = observer.on } /// Send `event` to this observer. /// /// - parameter event: Event instance. public func on(_ event: Event Element < >) {/ / here call on AnonymousObservableSink. The practical approach is to call on (Event) method to return the self. The observer (Event)} / / / Erases the type of observer and returns canonical observer. /// /// - returns: type erased observer. public func asObserver() -> AnyObserver<Element> { return self } }Copy the code
  • (19) Through the above AnyObserver source analysis, we know thatAnyObserverThe initialization saves our tubeAnonymousObservableSinktheon()Methods, and they have a on method, in his own on methods to call AnonymousObservableSink. On () method. It’s just a cover so the world doesn’t know about usAnonymousObservableSinkClass, why is that? This design has several advantages:
  1. It’s completely encapsulated, so the outside world doesn’t need to know about our tubesAnonymousObservableSinkClass, they don’t care about usAnonymousObservableSinkThe user only needs to use the interface on(). It doesn’t matter how on() is implemented or by whom.
  2. Acts as a decoupling effect,AnyObserverDoesn’t own usAnonymousObservableSinkObject, it just owns itAnonymousObservableSinkThe on() interface of theAnonymousObservableSinkThat’s all you need to do to implement the on() interface. As for theAnonymousObservableSinkInternal changes (as long as the ON () interface is not changed) will not affectAnyObserver.
  • (20) Now we focus on the on() method:

    • When weExample 1To perform:observer.onNext("kongyulu")This line of code actually calls:AnyObserver.onNext()Methods. Since we AnyObserver inherit the ObserverType protocol, we have itObserverTypetheOnNext ()Method, you can go back to class inheritance if you don’t know.
  • Anyobserver.onnext () calls its own on() method: anyobserver.onnext ()

Interface definition for ObserverType

extension ObserverType { public func onNext(_ element: Element) {self.on(.next(Element))// This will revert to AnyObserver's on() method, AnyObserver inherits ObserverType, Public func onCompleted() {self.on(.completed)} public func onError(_ error: Swift.Error) { self.on(.error(error)) } }Copy the code
  • (22)AnyObserver.on()Method will callAnonymousObservableSink.on()Methods.
  • (23)AnonymousObservableSink.on(event)Will be calledAnonymousObservableSink.forwardOn(event)
  • (24) It is not defined in AnonymousObservableSinkForwardOn ()Method, which we find implemented in its parent class SinkForwardOn ()The source code is as follows:
class Sink<Observer: ObserverType> : Disposable { fileprivate let _observer: Observer fileprivate let _cancel: Cancelable fileprivate let _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: Observer, cancel: Cancelable) {#if TRACE_RESOURCES _ = resources.incrementtotal () #endif We 'self.asObservable().subscribe(observer)' pass the 'AnonymousObserver' object self._observer = observer self._cancel = cancel} final func forwardOn(_ event: Event<Observer.Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) {return} // The 'anonymousobServer.on ()' method is actually called here. self._observer.on(event) } ... This time the code omitted, do not need to worry about}Copy the code
  • (25) From the above source we can see:Sink. ForwardOn ()Actually calledAnonymousObserver.on()To put it bluntly: we started with instance 1observer.onNext("kongyulu")Ob.onnext () is called first when this line of code executesAnyObserver.on().AnyObserver.on()Will call againAnonymousObservableSink.on().AnonymousObservableSink.on()Will call againAnonymousObservableSink.forwardOn()And thenAnonymousObservableSink.forwardOn()The AnonymousObservableSink parent will be called againSink. ForwardOn ()And finally theSink. ForwardOn ()Call theAnonymousObserver.on().
  • (26) Now that we’re pretty clear, let’s go back toAnonymousObserver.on()Method definition:
  1. First we looked at the class definition as follows, and did not find it by the on() method:
final class AnonymousObserver<Element>: ObserverBase<Element> {typeAlias EventHandler = (Event<Element>) -> Void private let _eventHandler: EventHandler init(_ eventHandler: @escaping EventHandler) {#if TRACE_RESOURCES _ = resources.incrementTotal () #endif // Here an incoming trailing closure is saved: Subscribe () let Observer = AnonymousObserver<Element> {event in here is a tag closure B} we pass _eventHandler to save tag closure B self._eventHandler = eventHandler } override func onCore(_ event: <Element>) {return self._eventhandler (Event)} #if TRACE_RESOURCES deinit {_ = Resources.decrementTotal() } #endif }Copy the code
  1. So let’s look for its parent class ObserverBase:
class ObserverBase<Element> : Disposable, ObserverType { private let _isStopped = AtomicInt(0) func on(_ event: Event<Element>) { switch event { case .next: If load(self._isstopped) == 0 {self.oncore (event)} case.error,.completed: if fetchOr(self._isStopped, 1) == 0 { self.onCore(event) } } } func onCore(_ event: Event<Element>) { rxAbstractMethod() } func dispose() { fetchOr(self._isStopped, 1) } }Copy the code
  1. We know by analyzing the parent source codeObserverBase.on()And finally calledAnonymousObserver.onCore()Anonymousobserver.oncore () calls back the _eventHandler(event) closure B, which is the trailing closure that created AnonymousObserver when we subscribed to the ob.subscribe() sequence. So the trailing closure ends up calling the onNext() method to which we subscribed. In example 1, executeobserver.onNext("kongyulu")This line of code is going to call backDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}So it prints“Subscribe to: Kongyulu”

The code for the trailing closure B of AnonymousObserver {B} is as follows:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ... // Note that: AnonymousObserver<Element> {B} the trailing closure inside the parentheses is called B, Closure B let Observer = AnonymousObserver<Element> {event in... Switch event {case.next (let value): onNext? (value) // Call onNext(value) case.error (let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) }Copy the code
  • (27) Through the analysis of point (26), we should understand the whole subscription process, which can be summarized as follows:

    1. Our ob. Create (The closure of ASave closure A inAnonymousObservableIn the variable_subscribeHandler.
    2. When we call ob.subscribe(Closure BWhen subscribing to a sequence, one is created firstAnonymousObserverObject, and will take a trailingClosure C. Then throughself.asObservable().subscribe(AnonymousObserver) Through a series of transformationsAnyObserverPassed to theThe closure of A.
    3. In 2 of them, a series of transformations can be simply explained as:
    • self.asObservable().subscribe(AnonymousObserver)The actual isob.subscribe(AnonymousObserver)
    • ob.subscribe(AnonymousObserver)The actual isProducer.subscribe(AnonymousObserver)
    • Producer.subscribe(AnonymousObserver)Will be calledself.run(AnonymousObserver)
    • self.run(AnonymousObserver)Will create aAnonymousObservableSinkTube object sink and then callsink.run(AnonymousObservable)The tube’s run() method is called and OB is passed to the tube sink.
    • And we tubesink.run(AnonymousObservable)Method is calledparent._subscribeHandler(AnyObserver(self))The actual isob._subscribeHandler(AnyObserver(AnonymousObservableSink))That’s calledThe closure of A
    • And weThe closure of AYou need to pass in an argumentAnyObserver(AnonymousObservableSink)In fact,AnyObserverIt’s just a structure. It’s preservedAnonymousObservableSink.on()Methods.
    • When we areThe closure of AIt callsobserver.onNext("kongyulu")In factAnyObserver.onNext("kongyulu")And theAnyObserver.onNext("kongyulu")Will be calledAnyObserver.on()
    • AnyObserver.on()And then callAnonymousObservableSink.on(event)So in this event
    • AnonymousObservableSink classAnonymousObservableSink.on(event)And then it calls its ownforwardOn(event)That isAnonymousObservableSink.forwardOn(event)
    • AnonymousObservableSink.forwardOn(event)It’s actually calling its parentSink.forwardOn(event)The Sink parent class is already saved when initializedAnonymousObserver_observer object.
    • Sink.forwardOn(event)What’s going to be called isAnonymousObserver.on(event)
    • AnonymousObserver.on(event)It actually calls its parent classObserverBase.on(event)
    • ObserverBase.on(event)It actually calls the subclass againAnonymousObserver.onCore(event)
    • AnonymousObserver.onCore(event)Will be calledself._eventHandler(event)In this case, _eventHandler saves the trailing passed in when AnonymousObserver was createdClosure CSo that’s the callbackClosure C
    • Closure CIs called back according to the eventClosure BFor example, the event=.onNext event will be called backClosure BOnNext {}, i.eDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}, onError: dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")} {(error) in print (" subscription to: \ (error) ")}, onCompleted: {print (" finished ")}) {print (" destruction of callback ")}Here’s the code:OnNext: {(anything) in print(" subscribed :\(anything)")So it prints: “Subscribed to: Kongyulu.”

Finally, there is a flow chart to illustrate the whole creation and subscription process

2. Sequence creation, subscription diagram

As explained above, in order to solve the problem of sequence, we began to analyze Dispose (), which seems to have code everywhere in the whole source code, so how is the sequence destroyed?

To solve this problem, we will explore the sequence destruction process by analyzing the source code.

Here is a sequence life cycle sequence diagram:

Through this sequence diagram, combined with the sequence creation and subscription process analysis above, I can first find three ways that the sequence will be destroyed:

  • Method 1: Release sequence resources by sending events that automatically end the sequence life cycle. Once an error or completed event is emitted, the life cycle of a sequence ends and all internal resources are released without manual release. (This conclusion was verified in this blog when we discussed instances 1 and 2. OnComplete is called and a “destroyed” message is printed whenever completed and error events are sent.)
  • Method 2: Dispose () is called to release. For example, if you need to pre-dispose a sequence resource or unsubscribe, you can call dispose on the returned Disposable.
  • Method 3: Recycle resources through garbage bag DisposeBag to achieve automatic release, which is officially recommended. The official recommendation for managing the life cycle of a subscription is to add resources to a global DisposeBag that follows the life cycle of the page, and when the page is destroyed, the DisposeBag is also destroyed and the resources in the DisposeBag are released. (This conclusion is also confirmed in the DisposeBag analysis above)

3. Sequence destruction case analysis

Let’s start by reviewing example 1, the code for example 1 that this blog began analyzing:

Func limitObservable(){let ob = Observable<Any>. Create {(observer) -> Disposable OnNext ("kongyulu") return Disposables. Create {print(" dispose ")} // dispose. Dispose ()} // sequence subscribe let dispose = Ob. subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}, onError: {(error) in print(" subscribed :\(error)")}, onCompleted: {print (" finished ")} {print (" destruction of callback ")} print (" finished "). / / the dispose the dispose ()}Copy the code
  1. The above code executes as follows:

  1. From the above results, we know that the sequence created is not destroyed, that is, “destroyed free” is not printed, nor “Destroy callback” is printed. Why is that? This problem we later through the analysis of the source code Rx source will know.
  2. Now let’s uncomment that line of code abovedispose.dispose()This line of code, uncommented and rerun, produces the following output:

3.1 Sequence destruction source code analysis

  1. From the code in example 1 above, you can first see that the sequence is being createdObservable<Any>.create()Method has a trailing closure and needs to return an implementationDisposableAn instance of a protocol. And that is throughReturn Disposables. Create {print(" Disposables ")}This line of code returns. From this we confirmDisposables. Create {print(" Destroy release ")}It’s very important. Let’s analyze it firstDisposables.createThe source code.
  2. Enter Disposables. Create () source: We want to just click On It and find Disposables is an empty structure
public struct Disposables {
    private init() {}
}
Copy the code

Since this structure is private even the initialization method cannot be inherited, we infer that Disposables. Create () must be implemented by extension. So we’ll search for extension Disposables in the project, and you’ll find the following:

So we find the first: AnonymousDisposable. Swift file into the line find 55:

extension Disposables { /// Constructs a new disposable with the given action used for disposal. /// /// - parameter dispose: Disposal action which will be run upon calling `dispose`. public static func create(with dispose: @escaping () -> Void) -> Cancelable {return AnonymousDisposable(disposeAction: dispose)Copy the code

Return AnonymousDisposable(disposeAction: Dispose dispose() closes, which is Disposables in example 1. Create {print(” dispose released “)} // dispose. Dispose ()} {print(” destroy free “)} here we give it an alias: closure D

  1. Don’t think about it. We’re definitely going inAnonymousDisposableClass implementation to explore:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
    public typealias DisposeAction = () -> Void

    private let _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    /// - returns: Was resource disposed.
    public var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    fileprivate init(_ disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    // Non-deprecated version of the constructor, used by `Disposables.create(with:)`
    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    fileprivate func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}
Copy the code
  1. Analyzing the above AnonymousDisposable class definition source code, we can draw the following conclusions:
  • Initialization saves the closure passed in from the outside world, which is what we analyzed in point 2Closure D:{print(" destroy free ")}
  • There is adispose()By means offetchOr(self._isDisposed, 1) == 0This line of code controlsdispose()The contents are executed only once. (no matterdispose()How many times the method is executed,if let action = self._disposeAction { self._disposeAction = nil action() }This code will be executed at most once.
  • dispose()I’m going to do it firstself._disposeActionAssign a value to a temporary variableaction, and then emptyself._disposeActionAnd then to performaction(). The reason for doing this is if_disposeActionClosures are a time-consuming operation and can be guaranteed_disposeActionCapable of immediate release.
  1. AnonymousDisposable we only saw some routine save operations, combined with our experience in the creation process of analyzing sequences at the beginning (AnonymousDisposable is similar to AnonymousObservable), We can infer that the core code implementation must be in the subscription area.

  2. Next, we’ll dive into the observable.subscribe() method to explore some of the subscribe() source code implementation.

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? Disposable = Disposable (Disposable) -> Disposable {//1. Disposables if let disposed = ondisposables {disposable = Disposables. Create (with: disposed) } else { disposable = Disposables.create() } //3. Creates an AnonymousObserver object, There is an important trailing closure let Observer = AnonymousObserver<Element> {event in switch event {case.next (let value): onNext? (value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, Dispose (); // Dispose (); // Dispose () Disposables onCompleted?() disposable.dispose() // This event will dispose of the Disposables when completed is received}} return Disposables. Create ( Self.asobservable ().subscribe(observer), disposable// here we pass our local variable to self.asObservable().subscribe, That's our producer.subscribe)}Copy the code

Analysis of the above subscribe () source, combined with the beginning of the analysis, we can draw the following conclusions:

  • The subscribe ()Created aDisposableObject and holds the destruction callback closure, which calls the message back out when the destruction is performed.
  • Executed when an error or completion event is receiveddisposable.dispose()Release resources.
  • return Disposables.create( self.asObservable().subscribe(observer), disposable )That’s returned hereDisposableObject is what we call outside manuallydispose.dispose()methodsdisposeObject, or added to the globalDisposeBagThe destroyers of.
  1. From the analysis of 6, we clearly know the last line of codereturn Disposables.create( self.asObservable().subscribe(observer), disposable )Key points, let’s enter:Disposables. The create ()Source:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {return BinaryDisposable(Disposable1, Disposable2)// Returns a binary disposer object. }Copy the code

So in the code above we see that Create () directly returns a Binary disposableDisposable1, Disposable2 to BinaryDisposable.

  • Here,disposable1isself.asObservable().subscribe(observer)That isProducer.. subscribe(observer)Returns the disposer
  • disposable2We subscribe() to create a local variablelet disposable: Disposable\

Since the | link

Swift Books download:Download address