In the previous article, I described the core logic of RxSwift, which is how a sequence is connected from creation to subscription, and then from sending messages to receiving messages. If you don’t quite understand it, you can go to the previous article.

This article mainly analyzes the implementation and design of several core classes and protocols of RxSwift.

Observables such resolution

An Observable is an Observable sequence. It’s the base class of all Observable sequences. We don’t use the Observable class directly. Observable can also be thought of as an abstract class, but it’s not an abstract class because subscribe, the most important method to which Observable sequences subscribe, must be overridden in its subclass.

Let’s start with the Observable source code:

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}
Copy the code
  • ObservableA protocol is implementedObservableTypeAnd,ObservableTypeProtocol inherited fromObservableConvertibleTypeAgreement, so inObservableTwo protocol methods are implemented in:subscribeandasObservable.
  • subscribeMethod has no implementation logic and requires subclasses to implement it.
  • asObservableMethod returns self, which may seem useless, but it’s not.asObservableIs very useful if one class isObservableSubclass, we can just return self, if notObservableWe can return one by overriding the protocol methodObservableObject, which ensures protocol consistency. When we use it we can just write likeself.asObservable().subscribe(observer)Such code is helpful to keep the code concise and is a good embodiment of encapsulation. Therefore, I think this design is very good and can be used for reference in our daily development.
  • _ = Resources.incrementTotal()and_ = Resources.decrementTotal()These two lines of code are actually a reference count implemented internally by RxSwift. I will explain this in more detail in a later article.
  • composeMap<R>Optimize a map function.
  • ObservableThere are a lot of subclasses, so I’m not going to look at them all, but the main difference is rightsubscribeMethods are implemented differently.

AnonymousObservableSink class

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
Copy the code
  • AnonymousObservableSinkisSinkThe subclass,AnonymousObservableSinkItself to comply withObseverTypeProtocol is implemented at the same timerunMethod, although not implementedsubscribeMethod, but enough. SoAnonymousObservableSinkIn a wayObservable.
  • AnonymousObservableSinkIt is a bridge between Observer and Observable, which can also be understood as a pipe. It stores the_observerAnd the destruction of_cancel. The transformation from Observable to Obsever can be completed with sink.
  • inrunMethodparent._subscribeHandler(AnyObserver(self)), where parent is oneAnonymousObservableObject._subscribeHandlerThis block call, the code will execute to the block that created the sequence. The code that sends the signal is then calledObserber.onnext (" Send signal ")And then the code will come through a few intermediate stepsAnonymousObservableSinkOf the classonMethods.

If you have any questions or suggestions, you are welcome to comment or write to us. Like friends can click on the following and like, the follow-up will continue to update the article.