Concat official introduction

Overall process: Sequence ([observable1, observable2]Element in the FIFO Queue. Internally create Sink pairs of sequence. elements to subscribe and forward one by one, subscribing to the latter one on the premise that the previous one issues onComplete. Complete all sequence. Element subscriptions in turn. Note that if one of these elements emitserror, terminates the subscription to the entire sequence.

Example:

        Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3)).subscribe { value in
            print(value)}. Disposed (by: disposeBag) print: next(1)
        next(2)
        next(3)
        completed
Copy the code

Object method definition:

extension ObservableType {
    public func concat<Source: ObservableConvertibleType> (_ second: Source) -> Observable<Element> where Source.Element = = Element {
        Observable.concat([self.asObservable(), second.asObservable()])
    }
}
Copy the code

Class method definition:

extension ObservableType {
    public static func concat<Sequence: Swift.Sequence> (_ sequence: Sequence) -> Observable<Element>
        where Sequence.Element = = Observable<Element> {
            return Concat(sources: sequence, count: nil)}public static func concat<Collection: Swift.Collection> (_ collection: Collection) -> Observable<Element>
        where Collection.Element = = Observable<Element> {
            return Concat(sources: collection, count: Int64(collection.count))
    }

    public static func concat(_ sources: Observable<Element> .) -> Observable<Element> {
        Concat(sources: sources, count: Int64(sources.count))
    }
}
Copy the code

Each of these methods is generated in a different formConcatobject

Concat:

// Inherits from Producer. Provides initialization methods, and overrides run methods.
final private class Concat<Sequence: Swift.Sequence> :Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
    typealias Element = Sequence.Element.Element
    
    fileprivate let sources: Sequence
    fileprivate let count: IntMax?

    init(sources: Sequence.count: IntMax?). {
        self.sources = sources
        self.count = count
    }
    
    override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Element {
        let sink = ConcatSink<Sequence.Observer>(observer: observer, cancel: cancel)
        let subscription = sink.run((self.sources.makeIterator(), self.count))
        return (sink: sink, subscription: subscription)
    }
}
Copy the code

Examples from the beginning of the article:

Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3))
Copy the code

Return a Concat instance, the source property format should look like this: the contents of “()” represent the value of Concat Sources:

[Concat([just1, just2]), just3]
Copy the code

Similarly, if there were four elements, the format would look like this:

[Concat([Concat([just1, just2]), just3]), just4]
Copy the code

Undo the Sequence of just1, just2, just3…

let subscription = sink.run((self.sources.makeIterator(), self.count))
Copy the code

This method passes in an iterator for the Sequence, the number of elements, and internal processing to unpack the Sequence

Take a look at Sink:

final private class ConcatSink<Sequence: Swift.Sequence.Observer: ObserverType>
    : TailRecursiveSink<Sequence.Observer>,ObserverType where Sequence.Element: ObservableConvertibleType.Sequence.Element.Element= =Observer.Element {
    typealias Element = Observer.Element 
    
    override init(observer: Observer.cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }

    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)}override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil}}}Copy the code

This class mainly completes the following operations:

    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)}Copy the code

Fetch the element from the FIFO’s Queue and subscribe

func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }
Copy the code

Forward subscriptions. If completed is received, take the next element from the Queue and subscribe.

    override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil}}Copy the code

Returns an iterator and the number of sequence elements. Again, Squence nesting, when you unlock Squence, you’re going to call this recursively. Observable ([just1, just2]) in [Concat([Concat([just1, just2]), just3]), just4]).

— — — — — — — — — — unfinished — — — — — — — — —

Parent class: TailRecursiveSink is a common and complex class that is examined separately in the next section.