I’m not going to talk about the idea of RxSwift, many articles on the Internet are very good, I don’t want to embarrass, let’s get straight to the point, first read a chestnut:

Observable<Int>.create { observe in
            observe.onNext(1)
            observe.onCompleted()
            return Disposables.create()
        }.subscribe(onNext: { res in
            print(res)
        }).disposed(by: disposeBag)
Copy the code

The code is very simple, and I ask you a question, what is an observable sequence, what is an observer, how do we create a sequence, how do we create an observer, which part is a sequence, which part is an observer? With these questions in mind, let’s begin!

Create observable sequence create function as follows:

public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }
Copy the code

Creat returns an anonymous sequence, AnonymousObservable, that is, creat creates a sequence, but it has an AnyObserver inside it, which is the source of the data being sent.

AnonymousObservable -> Producer -> Observable -> ObservableType

ObservableType: is a protocol that has a function subscribe that must be implemented by all classes that comply with the protocol. It also has a default implementation of asObservable that converts ‘ObservableType’ to ‘Observable’.

Observable (ObservableType) : an ObservableType for type erasers. In order to manage sequences coherently, all sequences inherit from ObservableType. Another important function is to calculate the number of subscriptions.

Producer: Implements the SUBSCRIBE function, which is important in a second. There is also an abstract function run that all inheritors need to implement. Subclasses do not implement subscription functions; they are handled through them

AnonymousObservable has a closure: Subscribe = (AnyObserver

) -> Disposable subscribe = (AnyObserver

) -> Disposable subscribe = (AnyObserver

) -> Disposable


Now that we have both the CREat function and the subscribe function in Producer, let’s see how it sends data. Take a look:

We started with a brief analysis of the CREat function, which has a source AnyObserver for sending data. Now we have a look at what the subscribe function does.

public func subscribe( onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil ) -> Disposable { let disposable: Disposable if let disposed = onDisposed { disposable = Disposables.create(with: disposed) } else { disposable = Disposables.create() } let observer = AnonymousObserver<Element> { event in switch event  { case .next(let value): onNext? (value) case .error(let error): if let onError = onError { onError(error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) }Copy the code

Inheritance chain: AnonymousObserver -> ObserverBase -> Disposable, ObserverType self.asObservable().subscribe(observer) executes the subscribe function in Producer. Look at the creat function above. That’s right, AnonymousObservable, and self.asObservable() does nothing but return itself. We can see why there are two subscribe functions, because they are called sequentially, so we can understand that they are the same function. This function has a conditional branch, which is used for thread problems, and the else branch is usually used, so just look at this:

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
       return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
                return disposer
        }
}
Copy the code

Let’s take a look at what ObserverType is. It’s a protocol with functions that must be implemented func on(_ event: Event

), which is used to send data. The function also has three default implementation functions: onNext, onCompleted, and onError, which are used to send Event events. Event is an enumeration as follows:

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
Copy the code

Yeah, the default implementation of the function to send separately is these are three events. I have self here. Who is it? Creat creates AnonymousObservable, simple logic problem don’t confuse, call the run function, go back to AnonymousObservable and see what the run function does:

let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
Copy the code

Inheritance chain: AnonymousObservableSink – > Sink, ObserverType, Sink – > the Disposable, what he did see AnonymousObservableSinkrun function, implementation is as follows:

func run(_ parent: Parent) -> Disposable {
     parent.subscribeHandler(AnyObserver(self))
}
Copy the code

Sink has an observer property, which is the AnonymousObserver that receives data from the SUBSCRIBE function above, and the forwardOn function, a final function. Run (self) does what sink. Run (self) does. Parent is AnonymousObservable and subscribeHandler is the closure of creat. What is AnyObserver(self)? SubscribeHandler = (AnyObserver

) -> Disposable Send events observe.onNext(1) and observe.oncompleted (). This observeis AnyObserver(self). Let’s take a look at AnyObserver:

struct AnyObserver<Element> : ObserverType {
    public typealias EventHandler = (Event<Element>) -> Void
    private let observer: EventHandler
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }
    public func asObserver() -> AnyObserver<Element> {
        self
    }
}
Copy the code

AnyObserver(self) self is AnonymousObservableSink. So we do this: self.observer = observer.on, which means that the observer in AnyObserver is the on function in AnonymousObservableSink, which is also the function that sends events. Take a look at the implementation:

func on(_ event: Event<Element>) {
     switch event {
       case .next:
           self.forwardOn(event)
       case .error, .completed:
           if fetchOr(self.isStopped, 1) == 0 {
               self.forwardOn(event)
               self.dispose()
           }
      }
 }
Copy the code

Each case branch executes the forwardOn function, and let’s see what it does:

final func forwardOn(_ event: Event<Observer.Element>) {
      if isFlagSet(self.disposed, 1) {
          return
      }
      self.observer.on(event)
 }
Copy the code

The “Observer” attribute in Sink is the AnonymousObserver in the SUBSCRIBE function. EventHandler = (Event

) -> Void (ObserverBase);

func on(_ event: Event<Element>) {
        switch event {
        case .next:
            self.onCore(event)
        case .error, .completed:
            self.onCore(event)
        }
}
Copy the code

It also has an abstract function, onCore, that must be implemented by subclasses, which happens to be overridden in the AnonymousObserver class. Let’s see what onCore does:

override func onCore(_ event: Event<Element>) {
    self.eventHandler(event)
}
Copy the code

So do you already know where self.eventhandler (event) executes? To see the observer created in Subscrebe:

let observer = AnonymousObserver<Element> { event in switch event { case .next(let value): onNext? (value) case .error(let error): if let onError = onError { onError(error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } }Copy the code

The big closure is eventHandler, and self.eventHandler(event) sends enumerated events.

Finally all the logic for creating, subscribing, sending, and receiving is smoothed out. Of course, this is just a small part of the basics of RxSwift, and I’ll write down the rest as I go along. If which say of have a problem welcome everybody to point out, don’t let me go wrong all the time 😄, still have I seldom write an article before, have what bad place welcome everybody to put forward an opinion or suggestion. Do you understand the first few small questions?

Summary: I have been using RxSwift for more than a year, and I have read the source code following the online articles before, but this way I feel that I do not work, a little lead by the nose feeling. After a period of time and forget, and then look at the source or do not know where to start. This time I completely do not rely on the article, independent look at the source code, stroke logic, feel ok, RxSwift source code is not so difficult to understand, is very around, around and around the muddled, also have to start from scratch, too uncomfortable.

In addition, I would like to remind you that I recently interviewed a company using RxSwift, and I did not ask deep questions, but I did not prepare for it. RxSwift even forgot the basic idea, in fact, it is not a pity, but also expected. I about every two or three months will interview once or twice, one to see the opportunity, but to see their level 😄, as expected, every time do not review, some concepts of things do not remember, the nature of the concept of things always forget, technical level or dish 😄.

Anyway, no matter what your job is like, I suggest you go to zhongda factory every once in a while. Interviewing is also learning! Come on, everybody!