1. ObservableTypeAdd extensionsmapmethods
  2. mapMethod implements an internal constructMapObject that inherits fromProducerTo overriderunmethods
  3. MapSinkAs amapThe actual subscriber of the source sequence before the transformation, performing the transformationTransformThen forward to subscribers

Conclusion: The ingenious design of Sink lies in that it can forward the elements sent by the observable sequence and call Dispose to break the internal circular reference and release the resources when the error and complete events are received.

extension ObservableType {

    /** Projects each element of an observable sequence into a new form. - seealso: [map operator on reactivex.io](http://reactivex.io/documentation/operators/map.html) - parameter transform: A transform function to apply to each source element. - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. */
    public func map<Result> (_ transform: @escaping (Element) throws -> Result)
        -> Observable<Result> {
        Map(source: self.asObservable(), transform: transform)
    }
}

final private class MapSink<SourceType.Observer: ObserverType> :Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = Observer.Element 
    typealias Element = SourceType

    private let transform: Transform

    init(transform: @escaping Transform.observer: Observer.cancel: Cancelable) {
        self.transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self.transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

final private class Map<SourceType.ResultType> :Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType

    private let source: Observable<SourceType>

    private let transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self.source = source
        self.transform = transform
    }

    override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = ResultType {
        let sink = MapSink(transform: self.transform, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}
Copy the code