RxSwift chapter
- first
- Sequential response logic
- Sequence types
- The operator
- Scheduler – Scheduler (1)
- Scheduler – Scheduler (2)
- Common sequence
- Subject is both offensive and defensive
- MVVM bidirectional binding
Look at a very special type -Subject. Why is it special? The reason is simple: **Subject can be both a sequence and an observer! Because of this feature, ** is widely used in actual development. Let’s interpret this particular Subject
The principle of
SubjectType
/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
///
associatedtype Observer: ObserverType
///
func asObserver() -> Observer
}
Copy the code
-
SubjectType first inherits ObservableType and has sequence properties.
-
Associated with the observer type, has the ability of that type.
-
Let’s get a feel for the Subject through a concrete type.
// Create sequenceletPublishSub = PublishSubject<Int>() // send response publishsub.onnext (1) // subscribe sequence publishsub.subscribe {print("Subscribed :".$0Prompt (by: Disposbag) // Send response publishsub.onnext (2) publishsub.onnext (3)Copy the code
-
The ability to subscribe to signals and send responses is obvious.
-
View the underlying source code analysis
Subscribe to the process
public override func subscribe(_ observer: Observer)... -> Disposable
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe(_ observer: Observer)... -> Disposable {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
Copy the code
- Self._observer. insert(observer.on): Add all the subscribed events in one set, and it is obvious that all responses can be handled.
Signal sending process
public func on(_ event: Event<Element>) {
...
dispatch(self._synchronized_on(event), event)
}
Copy the code
- The dispatch function is called primarily, passing in two parameters: self._synchronized_on(event) and event
- Analysis of the self. _synchronized_on (event)
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
Copy the code
-
Here if self _isDisposed | | self. _stopped form will return an empty collection, there would be no sequence response
-
Both.completed and.error change the state self._stopped = true, which means that the sequence can no longer respond after completion or error
-
On.completed,.error also removes content added to the collection
-
Focusing on returning observers based on event state, observers continue with the Dispatch function analysis
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) { bag._value0? (event)if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
Copy the code
-
bag._value0? (event) The callback to the event is performed first
-
Check bag._onlyFastPath, default is fast channel!
-
If the slow channel is enabled, external event callbacks are processed in sequence from pairs and Dictionaries.
summary
Relative to ordinary sequences,subject implements both the subscription process and the response process internally, so there is no need to introduce sink!
The Subject classification
PublishSubject
Initialization is simple (with no default values), and it only sends elements to the subscriber that were received after the subscription.
letPublishSub = PublishSubject<Int>() // send publishsub.onnext (1) // subscribe publishsub.subscribe {print("Subscribed :".$0Prompt (by: Disposbag) // Send response publishsub.onnext (2) publishsub.onnext (3)Copy the code
- Signal :1 cannot be subscribed, only the response after the subscription is accepted.
BehaviorSubject
This is created with a default initialization value. When a subscriber subscrires to a BehaviorSubject, it receives a BehaviorSubject Event emitted after the subscription, or a default value emitted if no data has been received. The new event is then received as normal as the PublishSubject.
Unlike publish, behavior has a storage function: it stores the last signal.
letbehaviorSub = BehaviorSubject.init(value: 100) // 2: Signal behaviorSub.onNext(2) behaviorSub.onNext(3) // 3: Subscribe sequence behaviorSub.subscribe{print("Subscribed :".$0Prompt (by: Disposbag) // Send behaviorSub.onNext(4) behaviorSub.onNext(5) // Subscribe again behaviorSub.subscribe{print("Subscribed :".$0)}
.disposed(by: disposbag)
Copy the code
Source code analysis
Public init(value: Element) {self._element = value} Event<Element>) -> Observers { self._lock.lock(); defer { self._lock.unlock() }ifself._stoppedEvent ! = nil || self._isDisposed {return Observers()
}
switch event {
case .next(let element):
self._element = element
case .error, .completed:
self._stoppedEvent = event
}
return self._observers
}
Copy the code
-
Class with a property self._element that holds a signal
-
Event response: The new event overwrites the original event
ReplaySubject
Compared to the BehaviorSubject, only the storage capability of the ReplaySubject is different.
letreplaySub = ReplaySubject<Int>.create(bufferSize: OnNext (1) replaysub. onNext(2) replaysub. onNext(3) replaysub. onNext(4) // 3: Subscribe sequence replaySub.subscribe{print("Subscribed :".$0Prompt (by: Disposbag) // Replaysub.onnext (7) replaysub.onnext (8) replaysub.onnext (9)Copy the code
AsyncSubject
The AsyncSubject sends only the last event sent by the source Observable, and only after the source Observable has completed. (If the source Observable does not send any value, the AsyncSubject does not send any value either.)
letAsynSub = AsyncSubject<Int>.init() // 2: send signal asynsub.onNext (1) asynsub.onnext (2) // 3: subscribe sequence asynsub.subscribe {print("Subscribed :".$0Prompt (by: Disposbag) // Send asynsub.onnext (3) asynsub.onnext (4) asynsub.subscribe {print("Subscribed :".$0)
}
.disposed(by: disposbag)
asynSub.onCompleted()
Copy the code
- Source code response processing
func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
self._lock.lock(); defer { self._lock.unlock() }
if self._isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
self._lastElement = element
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll()
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
self._stoppedEvent = event
return (observers, .completed)
}
}
}
Copy the code
-
As you can clearly see, the normal Next event is an element replacement, with no response at all. When the complete event is sent, the last saved self._lastElement is passed as the event value, and the response is: Next (lastElement)
-
Send the completion event:.completed if no event is saved
-
Error events empty the entire response set: self._observers.removeall ()
BehaviorRelay
-
You can store a signal
-
Anytime subscribe response
-
Note that the response is sent with.accept().
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe { (result) in
print(result)
}
.disposed(by: disposbag)
print(Print: \ "(behaviorRelay. Value)")
behaviorRelay.accept(1000)
Copy the code
The source code
public final class BehaviorRelay<Element>: ObservableType {
private let _subject: BehaviorSubject<Element>
/// Accepts `event` and emits it to subscribers
public func accept(_ event: Element) {
self._subject.onNext(event)
}
/// Initializes
public init(value: Element) {
self._subject = BehaviorSubject(value: value)
}
}
Copy the code
- The BehaviorSubject is encapsulated internally, which is pretty obvious.