The paper
The boss recently gave me a new project, and I’m going to write it in Swift. The RAC used in OC was changed to Swift natural framework, but I also wanted to try a new one, so I used RXSwift. I know how to use both of these frameworks, but I couldn’t understand their principles. As the demand was not coming down recently, I studied RXSwif and shared my gains with you
What is RXSwift and how to use it? There are many online resources. This article starts from the implementation principle of Observable, aiming to see the big from the small
Use the Demo
Here is a simple code to use Observable
let numbers: Observable<Int> = Observable.create { observer -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onCompleted()
return Disposables.create {
}
}
numbers.subscribe{
print($0)}Copy the code
All the demo does is extract the generated events (0,1,Completed) entered in the last closure from the next closure. This separates event generation from event handling. This paper is to analyze the effect of how to achieve
The main class
AnonymousObservable
Anonymously observable, stores closures that generate events and activates portals that handle closures for events
AnyObserver
Any observer, used to store events and output events
AnonymousObserver
Anonymous observer, used to store closures for processing events
AnonymousObservableSink
Will be observable and observer link to achieve the transfer of events
ObserverType,ObservableType.. agreement
Protocol, which wraps all of the above, limits them and facilitates effective communication
Event
The event itself, which is an enumeration, has Error,Complete,Element
The implementation process
storage
First, a few things about the ObserverType definition
associatedtype E
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
Copy the code
E: Define a certain type for this event stream. Ensure that the generated and processed elements are of the same type, otherwise they cannot be passed
The create method
Observable
.create { observer -> Disposable in …. Observable} is an abstract class that can’t be used as an Observable. It has a default implementation in the protocol
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
Copy the code
So what we create here is an AnonymousObservable object, which I’ll call A1, which holds the closure from which the event was generated and feeds the event into the AnyObserver structure. The closure we call A2 so we can store the parts
The activation
To activate, we call A1’s subscribe method (which is also the protocol defined method). Next, we look at the implementation of the method. Since Observable is an abstract class, this is the protocol’s default implementation
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
Copy the code
There are two steps here, one is the implementation of the observer, but the passing of the event
The observer
In this case it is very simple to create the AnonymousObserver AnonymousObserver object B1, which holds the event handling closure, which we call B2
pass
First is the asObservable() method. Since B1 indirectly inherits from Observable, return self should be used in dealing with other types of observables. I will add ~ if I encounter it later
Then there is another subscription method to A1 (overloading), passing B1 as a parameter
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if! The first step in CurrentThreadScheduler. IsScheduleRequired {/ /letDisposer = SinkDisposer() // Step twoletSinkAndSubscription = run (the observer, cancel: disposer). / / the third disposer setSinkAndSubscription (sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer
}
//elseRegardless of ~else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
Copy the code
The first step
SinkDisposer is the disposer object that handles the disposer after the relay is over, called C1, which handles the disposer closure returned by the A1create closure
The second step
The run method is called, passing in the B1 object
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) whereE == Element {//2.1letSink = AnonymousObservableSink(Observer: observer, cancel: cancel) //2.2letThe subscription = sink. The run (self) / / 2.3return (sink: sink, subscription: subscription)
}
Copy the code
Step 2.1
Create an AnonymousObservableSink object, which I’ll call D1, which also holds B1 objects and C1 objects
Step 2.2
Call the run method of the D1 object, passing in A1 itself
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
Copy the code
In this method, we call the A2 closure of the A1 object and pass in the D1 object as an AnyObserver structure as the A2 argument ~
Then we’ll see how the D1 object is converted
Public init<O: ObserverType>(_ observer: O)where O.E == Element {
self.observer = observer.on
}
Copy the code
Here the structure holds the on method of the B1 object held by D1 as a property ~, making the structure E1
Take a look at E1’s onNext…. methods
extension ObserverType {
//YSD
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
on(.error(error))
}
}
Copy the code
The corresponding method is actually called on B1 ~~
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
onCore(event)
}
}
}
Copy the code
Corresponding to B1’s onCore method
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
Copy the code
That is, the event that E1 receives from A2 is passed to B2, and the content is finally passed ~~ and then the closure that releases resources from A1 is returned ~
2.3
Return the D1 and Disposable closures as tuples ~
The third step
C1 receives the tuple argument, calls setSinkAndSubscription method ~, and then returns the SinkDisposer object, leaving the user to choose whether to release ~
graphic
The writing is too abstract for the cameo appearance, which is slightly ugly ᴗ
As you can see, A1 only has a2jun in this process, which does not cause a cameo leak. Of course, you may have a cameo if you dispose of it improperly. ᴗ
details
1
Subscribe 2 if! CurrentThreadScheduler.isScheduleRequired
It goes something like this
public static fileprivate(setVar isScheduleRequired: Bool {get {// get this indicatorreturn pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {// Returns 0 on successtrueSet no. No Set totrue
ifpthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) ! = 0 { rxFatalError("pthread_setspecific failed")
}
}
}
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in/ / YSD / / https://onevcat.com/2015/01/swift-pointer/ / / pointer variable pthread_key_t type space distributionletkey = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1) defer { key.deallocate(capacity: 1)} // create thread-safe variable guard pthread_key_create(key, nil) == 0else {
rxFatalError("isScheduleRequired key creation failed")}return key.pointee
}()
Copy the code
In this event stream, only get method is used, and set method is not used, so the specific effect I do not know ~, later encounter I in the complement ~
SinkDisposer
Is to release the resource part ~
fileprivate enum DisposeState: UInt32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if(previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) ! = 0 { rxFatalError("Sink and subscription were already set")}if(previousState & DisposeStateInt32.disposed.rawValue) ! = 0 { sink.dispose() subscription.dispose() _sink = nil _subscription = nil } } funcdispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if(previousState & DisposeStateInt32.disposed.rawValue) ! = 0 {return
}
if(previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) ! = 0 { guardlet sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
Copy the code
This is to prevent multiple calls of Dispose, as dipose closure may generate Complete,Error or manually called by the user in the entire event stream
The AtomicOr method actually calls OSAtomicOr32OrigBarrier(A, &b), which returns A thread-safe bitwise or operation on two variables and assigns the latter to = the former ~ B=A
When dipose is not called, the two conditions of logic and operation state = 2 previousState = 0 are not valid ~ so users should dispose manually at this time
State = 1 when setSinkAndSubscription is called, state = 2 previousState when complete or Error occurs (as is guaranteed in the code above) = 1 then the first condition is not true and the second condition is true ~ release resources
Dipose only once when completing multiple times
State = 2; previousState = 1
Of course, there are many kinds of schemes to achieve this effect ~ RSSwift’s scheme is more forced case ~
conclusion
After looking at the source code, MY feeling is that RXSwift to the design pattern is very thorough ~ in the case of time is rich to write their own projects to close to him, enhance the ducsibility of the project, so that the project manager let add what will not be too headache ~~