RxSwift chapter

  • first
  • Sequential response logic
  • Sequence types
  • The operator
  • Scheduler – Scheduler (1)
  • Scheduler – Scheduler (2)
  • Common sequence
  • Subject is both offensive and defensive
  • MVVM bidirectional binding

RxSwift is on Github until now Star:16K. Why is this framework so popular, as a typical example of functional responsive framework, how is the underlying implementation implemented?

Sequence API

The API structure:

  • Create a sequence
  • Subscribe to the sequence
  • Send a signal
Observable<String>. Create {(obserber) -> Disposablein// 3. Send signal obserber.onnext ("Test")
           returnDisposables. Create () // This destruction will not affect our interpretation this time // 2. Subscribe (onNext: {(text))in
               print("Subscribe to :\(text)")})Copy the code

Analysis of the code

  • If you want to send A signal, you have to go to closure A.

  • 2: We perform 2: the subscription sequence follows closure B.

  • 3: The result confirms that closure A passed the “test” to closure B first.

  • 4: enter the source code, view the specific implementation logic.

Implementation of logic, analysis of source code

Create a sequence

/ / creat function

//subscribe public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {return AnonymousObservable(subscribe)
}
Copy the code

// Anonymous sequence objects

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
Copy the code

Code instructions

  • The create method creates an internal object, AnonymousObservable, that holds external closures (including sending signals)

  • AnonymousObservable inherits from Producer the subscribe method

Subscribe to the sequence

/ / the subscribe function

Public subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? //onNext goes: Observer = AnonymousObserver(initializable includes onNext) -> Disposable {//onNext goes: observer= AnonymousObserver(initializable includes onNext)let observer = AnonymousObserver<Element> { event in
                switch event {
                case .next(letvalue): onNext? (value)case .error(let error):
                ...
                    disposable.dispose()
                case.completed: ... }}return Disposables.create(
            //observer走向
                self.asObservable().subscribe(observer),
                disposable
            )
    }
Copy the code

Code description:

  • An AnonymousObserver (anonymous Internal Observer) is created in much the same way as our AnonymousObservable, where the initialization is a closure parameter that holds events like onNext, onError, onCompleted, etc.

  • Self.asobservable () this is our RxSwift for consistency

  • Self.asObservable(). Subscribe (observer) is essentially self. Subscribe (observer)

Override func subscribe<Observer: ObserverType>(_ Observer: ObserverType>(_ Observer: Observer) -> Disposablewhere Observer.Element == Element { 
          if! CurrentThreadScheduler.isScheduleRequired { ... }else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in/ / the keyletsinkAndSubscription = self.run(observer, cancel: disposer) ... }}}Copy the code
  • The self.run code is eventually extended by our Producer Producer into our own transaction code, anonymousobServable.run
override func run<Observer: ObserverType>(...) {let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
Copy the code
  • AnonymousObservableSink saves the observer
  • Sink.run (self) handles business.
func run(_ parent: Parent) -> Disposable {
    return parent._subscribeHandler(AnyObserver(self))
}

Copy the code
  • Parent is the AnonymousObservable object passed above

  • We are glad to see that anonymousobServable._subscribeHandler () explains why the process executes the closure that creates the sequence and then executes the send response when subscrioring to the sequence (the puzzle begins to show).

  • The code to send the response will be analyzed later, but there’s another important guy AnyObserver(self)

    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
       self.observer = observer.on
   }
Copy the code
  • In the constructor, we created a structure AnyObserver save a message AnonymousObservableSink. On function.

Send a response

Obserber.onnext (” test “) : AnyObserver.onNext(” test “) : AnyObserver.onNext(” test “) : AnyObserver.onNext(” test “) : AnyObserver.onNext(” test “) : AnyObserver.onNext(” test “) : AnyObserver.onNext(” test “) : AnyObserver.onNext(” test “) General idea, find the parent class, find the protocol

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
}
Copy the code
  • Anyobserver. onNext(” test “) transforms again: Anyobserver.on (.next(” test “))), this AnyObserver calls on which is passed the.next function, which takes our final arguments.
public struct AnyObserver<Element> : ObserverType {
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }

    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
}
Copy the code
  • The AnonymousObservableSink. On function is initialized to self. Observer: Self. Observer (event) -> AnonymousObservableSink. On (event) where event =.next(” test “)
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    func on(_ event: Event<Element>) {
    ...
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
}
Copy the code
  • Self.forwardon (event) This is also the core code to execute, because AnonymousObservableSink inherits Sink and wrapper, see the code below
class Sink<Observer: ObserverType> : Disposable {
    final func forwardOn(_ event: Event<Observer.Element>) {
    ...
        self._observer.on(event)
    }
}
Copy the code
  • Where self._observer is the observer we initialized to save: AnonymousObserver

  • On (.next(” test “))), where the logic goes back to the call to the AnonymousObserver parameter closure we created when we subscribed to the sequence! (Is the design verbose? Reasonable? Follow-up research…) .

let observer = AnonymousObserver<E> { event in. switch event {case .next(letvalue): onNext? (value)case .error(let error):
       ...
    case .completed:
        onCompleted?()
    }
}
Copy the code
  • Call onNext, okay? (value), because the enumeration’s associated value (Swift is very powerful) value = “test “, then the call to the external onNext closure, the result.

summary

  • Use functional thinking to submerge a set of requirements (encapsulating what the developer doesn’t care about), optimize code hierarchies, and simplify logic.