Concat official introduction
Overall process: Sequence ([observable1, observable2]
Element in the FIFO Queue. Internally create Sink pairs of sequence. elements to subscribe and forward one by one, subscribing to the latter one on the premise that the previous one issues onComplete. Complete all sequence. Element subscriptions in turn. Note that if one of these elements emitserror
, terminates the subscription to the entire sequence.
Example:
Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3)).subscribe { value in
print(value)}. Disposed (by: disposeBag) print: next(1)
next(2)
next(3)
completed
Copy the code
Object method definition:
extension ObservableType {
public func concat<Source: ObservableConvertibleType> (_ second: Source) -> Observable<Element> where Source.Element = = Element {
Observable.concat([self.asObservable(), second.asObservable()])
}
}
Copy the code
Class method definition:
extension ObservableType {
public static func concat<Sequence: Swift.Sequence> (_ sequence: Sequence) -> Observable<Element>
where Sequence.Element = = Observable<Element> {
return Concat(sources: sequence, count: nil)}public static func concat<Collection: Swift.Collection> (_ collection: Collection) -> Observable<Element>
where Collection.Element = = Observable<Element> {
return Concat(sources: collection, count: Int64(collection.count))
}
public static func concat(_ sources: Observable<Element> .) -> Observable<Element> {
Concat(sources: sources, count: Int64(sources.count))
}
}
Copy the code
Each of these methods is generated in a different formConcat
object
Concat:
// Inherits from Producer. Provides initialization methods, and overrides run methods.
final private class Concat<Sequence: Swift.Sequence> :Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
typealias Element = Sequence.Element.Element
fileprivate let sources: Sequence
fileprivate let count: IntMax?
init(sources: Sequence.count: IntMax?). {
self.sources = sources
self.count = count
}
override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Element {
let sink = ConcatSink<Sequence.Observer>(observer: observer, cancel: cancel)
let subscription = sink.run((self.sources.makeIterator(), self.count))
return (sink: sink, subscription: subscription)
}
}
Copy the code
Examples from the beginning of the article:
Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3))
Copy the code
Return a Concat instance, the source property format should look like this: the contents of “()” represent the value of Concat Sources:
[Concat([just1, just2]), just3]
Copy the code
Similarly, if there were four elements, the format would look like this:
[Concat([Concat([just1, just2]), just3]), just4]
Copy the code
Undo the Sequence of just1, just2, just3…
let subscription = sink.run((self.sources.makeIterator(), self.count))
Copy the code
This method passes in an iterator for the Sequence, the number of elements, and internal processing to unpack the Sequence
Take a look at Sink:
final private class ConcatSink<Sequence: Swift.Sequence.Observer: ObserverType>
: TailRecursiveSink<Sequence.Observer>,ObserverType where Sequence.Element: ObservableConvertibleType.Sequence.Element.Element= =Observer.Element {
typealias Element = Observer.Element
override init(observer: Observer.cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>){
switch event {
case .next:
self.forwardOn(event)
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.schedule(.moveNext)
}
}
override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
source.subscribe(self)}override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
if let source = observable as? Concat<Sequence> {
return (source.sources.makeIterator(), source.count)
}
else {
return nil}}}Copy the code
This class mainly completes the following operations:
override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
source.subscribe(self)}Copy the code
Fetch the element from the FIFO’s Queue and subscribe
func on(_ event: Event<Element>){
switch event {
case .next:
self.forwardOn(event)
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.schedule(.moveNext)
}
}
Copy the code
Forward subscriptions. If completed is received, take the next element from the Queue and subscribe.
override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
if let source = observable as? Concat<Sequence> {
return (source.sources.makeIterator(), source.count)
}
else {
return nil}}Copy the code
Returns an iterator and the number of sequence elements. Again, Squence nesting, when you unlock Squence, you’re going to call this recursively. Observable ([just1, just2]) in [Concat([Concat([just1, just2]), just3]), just4]).
— — — — — — — — — — unfinished — — — — — — — — —
Parent class: TailRecursiveSink is a common and complex class that is examined separately in the next section.