The paper

The boss recently gave me a new project, and I’m going to write it in Swift. The RAC used in OC was changed to Swift natural framework, but I also wanted to try a new one, so I used RXSwift. I know how to use both of these frameworks, but I couldn’t understand their principles. As the demand was not coming down recently, I studied RXSwif and shared my gains with you

What is RXSwift and how to use it? There are many online resources. This article starts from the implementation principle of Observable, aiming to see the big from the small

Use the Demo

Here is a simple code to use Observable

        let numbers: Observable<Int> = Observable.create { observer -> Disposable in
            observer.onNext(0)
            observer.onNext(1)
            observer.onCompleted()
            
            return Disposables.create {
            }
        }

        numbers.subscribe{
            print($0)}Copy the code

All the demo does is extract the generated events (0,1,Completed) entered in the last closure from the next closure. This separates event generation from event handling. This paper is to analyze the effect of how to achieve

The main class

AnonymousObservable

Anonymously observable, stores closures that generate events and activates portals that handle closures for events

AnyObserver

Any observer, used to store events and output events

AnonymousObserver

Anonymous observer, used to store closures for processing events

AnonymousObservableSink

Will be observable and observer link to achieve the transfer of events

ObserverType,ObservableType.. agreement

Protocol, which wraps all of the above, limits them and facilitates effective communication

Event

The event itself, which is an enumeration, has Error,Complete,Element

The implementation process

storage

First, a few things about the ObserverType definition

associatedtype E

func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
Copy the code

E: Define a certain type for this event stream. Ensure that the generated and processed elements are of the same type, otherwise they cannot be passed

The create method

Observable

.create { observer -> Disposable in …. Observable} is an abstract class that can’t be used as an Observable. It has a default implementation in the protocol

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}
Copy the code

So what we create here is an AnonymousObservable object, which I’ll call A1, which holds the closure from which the event was generated and feeds the event into the AnyObserver structure. The closure we call A2 so we can store the parts

The activation

To activate, we call A1’s subscribe method (which is also the protocol defined method). Next, we look at the implementation of the method. Since Observable is an abstract class, this is the protocol’s default implementation

    public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)       
            }

            return self.asObservable().subscribe(observer)
    }
Copy the code

There are two steps here, one is the implementation of the observer, but the passing of the event

The observer

In this case it is very simple to create the AnonymousObserver AnonymousObserver object B1, which holds the event handling closure, which we call B2

pass

First is the asObservable() method. Since B1 indirectly inherits from Observable, return self should be used in dealing with other types of observables. I will add ~ if I encounter it later

Then there is another subscription method to A1 (overloading), passing B1 as a parameter

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        
        if! The first step in CurrentThreadScheduler. IsScheduleRequired {/ /letDisposer = SinkDisposer() // Step twoletSinkAndSubscription = run (the observer, cancel: disposer). / / the third disposer setSinkAndSubscription (sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer
        }
        //elseRegardless of ~else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
Copy the code

The first step

SinkDisposer is the disposer object that handles the disposer after the relay is over, called C1, which handles the disposer closure returned by the A1create closure

The second step

The run method is called, passing in the B1 object

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) whereE == Element {//2.1letSink = AnonymousObservableSink(Observer: observer, cancel: cancel) //2.2letThe subscription = sink. The run (self) / / 2.3return (sink: sink, subscription: subscription)
    }
Copy the code

Step 2.1

Create an AnonymousObservableSink object, which I’ll call D1, which also holds B1 objects and C1 objects

Step 2.2

Call the run method of the D1 object, passing in A1 itself

 func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
Copy the code

In this method, we call the A2 closure of the A1 object and pass in the D1 object as an AnyObserver structure as the A2 argument ~

Then we’ll see how the D1 object is converted

Public init<O: ObserverType>(_ observer: O)where O.E == Element {
        self.observer = observer.on
    }
Copy the code

Here the structure holds the on method of the B1 object held by D1 as a property ~, making the structure E1

Take a look at E1’s onNext…. methods

extension ObserverType {
    //YSD
    /// Convenience method equivalent to `on(.next(element: E))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: E) {
        on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        on(.error(error))
    }
}
Copy the code

The corresponding method is actually called on B1 ~~

    func on(_ event: Event<E>) {
        switch event {                      
        case .next:
            if _isStopped == 0 {            
                onCore(event)
            }
        case .error, .completed:

            if AtomicCompareAndSwap(0, 1, &_isStopped) {
                onCore(event)
            }
        }
    }
Copy the code

Corresponding to B1’s onCore method

    override func onCore(_ event: Event<Element>) {
        return _eventHandler(event)         
    }
Copy the code

That is, the event that E1 receives from A2 is passed to B2, and the content is finally passed ~~ and then the closure that releases resources from A1 is returned ~

2.3

Return the D1 and Disposable closures as tuples ~

The third step

C1 receives the tuple argument, calls setSinkAndSubscription method ~, and then returns the SinkDisposer object, leaving the user to choose whether to release ~

graphic

The writing is too abstract for the cameo appearance, which is slightly ugly ᴗ

As you can see, A1 only has a2jun in this process, which does not cause a cameo leak. Of course, you may have a cameo if you dispose of it improperly. ᴗ

details

1

Subscribe 2 if! CurrentThreadScheduler.isScheduleRequired

It goes something like this

    public static fileprivate(setVar isScheduleRequired: Bool {get {// get this indicatorreturn pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {// Returns 0 on successtrueSet no. No Set totrue
            ifpthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) ! = 0 { rxFatalError("pthread_setspecific failed")
            }
        }
    }

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in/ / YSD / / https://onevcat.com/2015/01/swift-pointer/ / / pointer variable pthread_key_t type space distributionletkey = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1) defer { key.deallocate(capacity: 1)} // create thread-safe variable guard pthread_key_create(key, nil) == 0else {
            rxFatalError("isScheduleRequired key creation failed")}return key.pointee
    }()
Copy the code

In this event stream, only get method is used, and set method is not used, so the specific effect I do not know ~, later encounter I in the complement ~

SinkDisposer

Is to release the resource part ~

    fileprivate enum DisposeState: UInt32 {     
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }

    // Jeej, swift API consistency rules    
    fileprivate enum DisposeStateInt32: Int32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }
    
    private var _state: AtomicInt = 0
    private var _sink: Disposable? = nil
    private var _subscription: Disposable? = nil
    
    
 func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        _sink = sink
        _subscription = subscription

        let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if(previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) ! = 0 { rxFatalError("Sink and subscription were already set")}if(previousState & DisposeStateInt32.disposed.rawValue) ! = 0 { sink.dispose() subscription.dispose() _sink = nil _subscription = nil } } funcdispose() {
        let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

        if(previousState & DisposeStateInt32.disposed.rawValue) ! = 0 {return
        }

        if(previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) ! = 0 { guardlet sink = _sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = _subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            _sink = nil
            _subscription = nil
        }
    }
Copy the code

This is to prevent multiple calls of Dispose, as dipose closure may generate Complete,Error or manually called by the user in the entire event stream

The AtomicOr method actually calls OSAtomicOr32OrigBarrier(A, &b), which returns A thread-safe bitwise or operation on two variables and assigns the latter to = the former ~ B=A

When dipose is not called, the two conditions of logic and operation state = 2 previousState = 0 are not valid ~ so users should dispose manually at this time

State = 1 when setSinkAndSubscription is called, state = 2 previousState when complete or Error occurs (as is guaranteed in the code above) = 1 then the first condition is not true and the second condition is true ~ release resources

Dipose only once when completing multiple times

State = 2; previousState = 1

Of course, there are many kinds of schemes to achieve this effect ~ RSSwift’s scheme is more forced case ~

conclusion

After looking at the source code, MY feeling is that RXSwift to the design pattern is very thorough ~ in the case of time is rich to write their own projects to close to him, enhance the ducsibility of the project, so that the project manager let add what will not be too headache ~~