Preface: This article about RxSwift source code analysis is mainly on the source code analysis, does not involve the specific use of RxSwift. You can check RxSwift Chinese document or Github for details.
In the case of RxSwift, we need to create a sequence, then subscribe to signals, and send signals to implement a complete process. Let’s start with a simple example:
// 1: create sequencelet observable = Observable<Any>.create { (obserber) -> Disposable in// 3: send signal obserber.onnext ("Send a signal")
obserber.onCompleted()
// obserber.onError(NSError.init(domain: "fail", code: 10087, userInfo: nil))
returnDisposables. Create ()} // 2: Subscription signallet _ = observable.subscribe(onNext: { (text) in
print("Subscribe to Signal :\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("Complete") {})print("Destroyed")}Copy the code
What we need to focus on is:
- When does the create closure in the first line of code execute
- When is the closure of the subscribe signal executed
- What is the signal flow from sending to receiving
The following for these problems to explore one
The first step is the create function
Let’s start with the create function, which returns an AnonymousObservable object, AnonymousObservable is an anonymous obserable that stores the closure that generated the event (self._subscribeHandler = subscribeHandler) and activates the entry (run method) to process the event closure.
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
Copy the code
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
Copy the code
AnonymousObservable is a subclass of Observable, and their inheritance is: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType
The second step is subscribe
Source:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(letvalue): onNext? (value)case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
Copy the code
As you can see from the above code, we first create an AnonymousObserver object, pass a closure as an argument and save self._eventhandler = eventHandler when initialized. AnonymousObserver is an AnonymousObserver closure for storing and processing events.
And then at the end there’s self.asObservable().subscribe(observer). The asObservable returns the object itself, calls subscribe and passes the AnonymousObserver object. The class AnonymousObservable does not subscribe. The parent class Producer does not subscribe.
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if! CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed.let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
Copy the code
Here we call the run method, the run method of the AnonymousObservable class, passing an observer.
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
Copy the code
In the run method, an AnonymousObservableSink object is created and holds an observer, and the object’s run method is called to pass self, observable, as an argument. The AnonymousObservableSink class links observables to Observer observables and functions as a bridge between events.
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
Copy the code
This triggers a call to _subscribeHandler, which is the closure of the previous CREATE function. This solves the first question, “When does the create closure execute in the first line of code?” Convert self to AnyObserver in this method, that is, convert the AnonymousObservableSink object to AnyObserver. So, how does that work? Let’s look at the source code
public struct AnyObserver<Element> : ObserverType {
/// The type of elements in sequence that observer can observe.
public typealias E = Element
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
Copy the code
Observer = Observer. on, which allocates the on function of the AnonymousObservableSink class to the Observer variable of the AnyObserver class. From here you can see why this line of code. The observer onNext (” signal “) will eventually trigger AnonymousObservableSink. On the event.
// AnonymousObservableSink class on method func on(_ event: event <E>) {#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
Copy the code
Then go to the forwardOn method of the Sink class
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
Copy the code
The _observer here is the observer created in the second step with the SUBSCRIBE function. The on method of ObserverBase in the parent class is entered first
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
Copy the code
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
Copy the code
The _eventHandler closure here is the closure passed in with the SUBSCRIBE function at the second step. This brings us to the second and third questions.
conclusion
From the whole process, we can feel the charm of functional responsive programming, code introduction, thinking is very clear.
To comb through the process:
Tips: Due to the extensive use of protocols and generics in RxSwift, as well as many abstract methods and namesames, it may be a little difficult to read the source code at first. In addition, Xcode tracing code is not very powerful, so it is difficult to read. So when reading the source code need to be patient, do not find the wrong place, you can find some key code to read, do not blindly see from beginning to end.
If you have any questions or suggestions, you are welcome to comment or write to us.
Like friends can click on the following and like, the follow-up will continue to update the article.