1. ObservableConvertibleType protocol:

/// Type that can be converted to observable sequence (`Observable<Element>`).
public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype Element

    @available(*, deprecated, renamed: "Element")
    typealias E = Element

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable(a) -> Observable<Element>}Copy the code

Func asObservable() -> Observable

2. ObservableType

Definition:

   /// Represents a push style sequence.
public protocol ObservableType: ObservableConvertibleType {

    func subscribe<Observer: ObserverType> (_ observer: Observer) -> Disposable where Observer.Element = = Element
    
}

extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable(a) -> Observable<Element> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}
Copy the code

This is easier to understand; the agreement requires that an Observer be provided

3. ObserverType

Definition:

/// Supports push-style iteration over an observable sequence.
public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype Element

    @available(*, deprecated, renamed: "Element")
    typealias E = Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}

/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted(a) {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

Copy the code

Provides a func on(_ event: event

) method whose parameter type enumerates type event. The processing to be done on the received element. To facilitate expansion:

public func onNext(_ element: Element)

public func onCompleted()

public func onError(_ error: Swift.Error)

4. ObserverBase

Definition:

class ObserverBase<Element> : Disposable.ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) = = 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) = = 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose(a) {
        fetchOr(self._isStopped, 1)}}Copy the code

Implement the Disposable protocol, and forward on to func onCore(_ event: event

) what the on method does when it receives an event. Subclasses must do the following for func onCore(_ event: event

) Event

) method overridden


5. AnonymousObserver

Definition:

final class AnonymousObserver<Element> :ObserverBase<Element> {
    typealias EventHandler = (Event<Element- > >)Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}
Copy the code

AnonymousObserver inherits from ObserveBase with an initialization parameter of the closure type :(Event

) -> Void, which is the operation to be performed on the received Element after subscription. Override the func onCore(_ event: event

) method in which this closure is called. The protocol method func on(_ event: Event

) that completes ObserverBase is actually the AnonymousObserver initialization closure that is called.


6. AnyObserver

define

/// A type-erased `ObserverType`.
///
/// Forwards operations to an arbitrary underlying observer with the same `Element` type, hiding the specifics of the underlying observer type.
public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element- > >)Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType> (_ observer: Observer) where Observer.Element = = Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver(a) -> AnyObserver<Element> {
        return self}}Copy the code

Create a generic observer with an instance or closure that complies with the ObserverType protocol. (I don’t understand why ObserveBase is not inherited like AnonymousObserver.)

7. ObservableType + Extension (SUBSCRIBE parameter = closure)

Definition:

extension ObservableType {
    /** Subscribes an event handler to an observable sequence. - parameter on: Action to invoke for each event in the observable sequence. - returns: Subscription object used to unsubscribe from the observable sequence. */
    public func subscribe(_ on: @escaping (Event<Element- > >)Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }
   
    public func subscribe(onNext: ((Element) - >Void)? = nil.onError: ((Swift.Error) - >Void)? = nil.onCompleted: (() - >Void)? = nil.onDisposed: (() - >Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker(a)#endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}
Copy the code

Basic flow: Build an instance of AnonymousObserver with an incoming closure that makes AnonymousObserver an Observable Observer