Today, I’ll take a look at another operator in RxSwift that is particularly important, perhaps one of the most important: flatMap.
If you haven’t read the first part of this series, I suggest you do so.
During the development of RxSwift, there was a very common scenario: click a button, send a request, and then process the result of the request. The code looks like this:
myBtn
.rx
.tap
.flatMap { _ in
performRequest()
}
.map {
processResponse($0)
}
.subScribe(
onNext: { ... }
)
.disposed(by: bag)
Copy the code
One of the core operators to use in this scenario is flatMap, which converts the click events generated by myBtn into another sequence of events and passes the events generated by that sequence down. The code is as follows:
extension ObservableType {
public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
-> Observable<Source.Element> {
return FlatMap(source: self.asObservable(), selector: selector)
}
}
Copy the code
This method accepts a (Element) throws – > Source closure, Element is its Element type, the return value type Source is a generic parameter, and follow the ObservableConvertibleType is. That is, the Closuer converts events generated upstream into a sequence of events. The method implementation returns an object of type FlatMap that holds self.asObservable() and the closure. Next, let’s look at the source of this FlatMap:
final private class FlatMap<SourceElement, SourceSequence: ObservableConvertibleType>: Producer<SourceSequence.Element> {
typealias Selector = (SourceElement) throws -> SourceSequence
private let source: Observable<SourceElement>
private let selector: Selector
init(source: Observable<SourceElement>, selector: @escaping Selector) {
self.source = source
self.selector = selector
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
let sink = FlatMapSink(selector: self.selector, observer: observer, cancel: cancel)
let subscription = sink.run(self.source)
return (sink: sink, subscription: subscription)
}
}
Copy the code
FlatMap is a subclass of Producer that generates an object of the FlatMapSink class using self.selector and an observer in its run method. This selector is the closure passed in when the flatMap is called, and the Observer is actually the observer created in the SUBSCRIBE method (see article 1 in this series). The FlatMapSink’s run method is then called. We don’t see the run method in FlatMapSink because FlatMapSink is a subclass of MergeSink, and much of its logic is in its parent, MergeSink:
private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>
: Sink<Observer>
, ObserverType where Observer.Element == SourceSequence.Element {
typealias ResultType = Observer.Element
typealias Element = SourceElement
let lock = RecursiveLock()
var subscribeNext: Bool {
true
}
// state
let group = CompositeDisposable()
let sourceSubscription = SingleAssignmentDisposable()
var activeCount = 0
var stopped = false
......
func run(_ source: Observable<SourceElement>) -> Disposable {
_ = self.group.insert(self.sourceSubscription)
let subscription = source.subscribe(self)
self.sourceSubscription.setDisposable(subscription)
return self.group
}
}
Copy the code
At the very end of the source, we see the run method, which has only four lines. It is the second line we need to focus on:
let subscription = source.subscribe(self)
Copy the code
That is, self subscribs to the incoming Observable, which is the flatmap. source (the upstream sequence). Then the general flow is clear: Obsevable. FlatMap, let the flatMap class hold upstream Obsevable, and subscribe to this upstream Obsevable through FlatMapSink. Then all upstream Obsevable events are passed to FlatMapSink’s ON method (see part 1 of this series). Next, we need to look at what the FlatMapSink’s on method does, again in its parent class MergeSink.
func on(_ event: Event<SourceElement>) {
switch event {
case .next(let element):
if let value = self.nextElementArrived(element: element) {
self.subscribeInner(value.asObservable())
}
case .error(let error):
self.lock.performLocked {
self.forwardOn(.error(error))
self.dispose()
}
case .completed:
self.lock.performLocked {
self.stopped = true
self.sourceSubscription.dispose()
self.checkCompleted()
}
}
}
Copy the code
The error event calls the forwardOn directly, passing the event to the downstream observer, and the Completed event calls the checkCompleted method, which determines whether the completed event needs to be passed on. This is because MergeSink may subscribe to multiple sequences of events, perform a series of transformations on them, and not deliver them until all sequences are complete, but this is not the focus of today, so just get a general idea, which we’ll cover in a future article. Next look at the next event, which calls self.nextelementArrived (Element: Element) :
final private func nextElementArrived(element: SourceElement) -> SourceSequence? { self.lock.performLocked { if ! self.subscribeNext { return nil } do { let value = try self.performMap(element) self.activeCount += 1 return value } catch let e { self.forwardOn(.error(e)) self.dispose() return nil } } }Copy the code
This method first locks, and then executes the self.performMap(Element) method, which is implemented in subclasses of FlatMapSink, and essentially calls self.selector:
override func performMap(_ element: SourceElement) throws -> SourceSequence {
try self.selector(element)
}
Copy the code
The selector is actually the closure passed in when the original call to Observable.flatMap is made, so myBtn’s click event is converted to a sequence of events. After getting the return value, the ON method calls the subscribeInner method:
func subscribeInner(_ source: Observable<Observer.Element>) {
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = self.group.insert(iterDisposable) {
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter)
iterDisposable.setDisposable(subscription)
}
}
Copy the code
In this case, you create a MergeSinkIter object that subscribes to the incoming Observable. According to MergeSinkIter’s on method:
func on(_ event: Event<Element>) {
self.parent.lock.performLocked {
switch event {
case .next(let value):
self.parent.forwardOn(.next(value))
case .error(let error):
self.parent.forwardOn(.error(error))
self.parent.dispose()
case .completed:
self.parent.group.remove(for: self.disposeKey)
self.parent.activeCount -= 1
self.parent.checkCompleted()
}
}
}
Copy the code
We can see that it actually passes the event to the parent, MergeSink. Note, however, that this is the direct invocation of the forwardOn method, so it passes the event directly to the downstream observer and does not trigger the performMap method. (You can’t call the ON method directly either, since the parameter generics they receive are usually different.)
This completes the entire flatMap process.
1. Create a flatMap object through flatMap, and subscribe to Upstream Observable 2 through FlatMapSink. When an upstream Observable generates an event, the FlatMapSink invokes the flatmap. performMap method to trigger the closure, generating a new sequence of events. 3. Pass the events of the new Event sequence to downstream observers through MergeSinkIter, thus realizing the transmission of Event->Observable->Event.
Its process is similar to the core logic of most operators in RxSwift. It holds the upstream Observable, creates a subclass of Sink to subscribe to the upstream Observable, and then passes the event to the downstream observer. Just because you’re generating a sequence of events, there’s an additional MergeSinkIter that handles the events of that sequence and passes them. Of course, today we just introduced the specific process of FlatMap, but we neglected the details of MergeSink, which we will talk about later.
Code word is not easy, if there are mistakes, welcome to correct. If it helps, please give it a thumbs up. ^ _ ^