1, PublishSubject

The PublishSubject will send elements generated after the subscription to the observer, and elements sent before the subscription will not be sent to the observer.

Example:

let subject = PublishSubject<String>()

subject.onNext("🐘")

subject.subscribe(onNext: { print("Subscribed to:\ [$0)") })
    .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")
Copy the code
Subscribed to: 🐶 Subscribed to: 🐱Copy the code

Source code analysis:

PublishSubject
PublishSubject
Listening sequence
The observer

(1) On the first execution of onNext function subject.onnext (“🐘”)

PublishSubject.on
dispatch
self._synchronized_on(event)

  • self._synchronized_on(event)The function will eventually returnself._observers
  • whileself._observersIs of typeBag<(Event<Element>) -> Void>.
  • Check it againdispatchFunction source code

    And as you can see from the analysis, that’s what it doesbagTo execute the callback.

Because of the input from the last stepbagIt’s just an initializerbagNo code blocks are saved in between, so no callbacks are executed. I’m going to go over here and execute it the first timeSubject. OnNext (" 🐘 ")It’s over. Nothing was printed.

Call subscribe.

  • inPublishSubjectIn the source code, to achievesubscribe.
  • It will end up callinglet key = self._observers.insert(observer.on)
  • To viewinsertFunction source code.

    And since it’s the first time, so_dictionaryThe value is nil and_pairs.countLess thanarrayDictionaryMaxSize. So, observer’sonfunctionobserver.onWas added to the_pairsIn the array.

(3) Execute onNext function subject.onnext (“🐶”) for the second time. The initial steps are the same as in (1), except that when the dispatch function is executed at the end, the callback is executed because bag._pairs has a block of code for the ON function that holds an observer.

It will eventually call back to the subscribe. OnNext closure and print the result. (PS: If you are not clear about this step, please read the introduction of RxSwift core logic.)

2, BehaviorSubject

When an observer subscribes to a BehaviorSubject, it sends out the latest element in the source Observable. If the latest element does not exist, the default element is sent. The subsequent elements are then sent out.

Example:

let subject = BehaviorSubject<Int>(value: 100)

subject.subscribe(onNext: { print("Subscribe 1:\ [$0)") })
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(5)

subject.subscribe(onNext: { print("Subscribe 2:\ [$0)") })
    .disposed(by: disposeBag)
Copy the code
Print results: Subscribe1:100To subscribe to1:3To subscribe to1:5To subscribe to2:5
Copy the code

Source code analysis: The BehaviorSubject and PublishSubject source code are similar, but there are also some differences. We focus on the differences. Please refer to the above PublishSubject source code analysis for similarities.

(1) Initialization method

public init(value: Element) {
    self._element = value
    
    #if TRACE_RESOURCES
    _ = Resources.incrementTotal()
    #endif
}
Copy the code

The initialization method of BehaviorSubject needs to pass in an initial value as a default element.

(2) the subscribe function

BehaviorSubject
_synchronized_subscribe
PublishSubject
observer.on
self._element
self._element
init

(3) onNext function

BehaviorSubject
onNext
self._element
subscribe

3, ReplaySubject

The ReplaySubject will send all elements to the observer regardless of when the observer subscribed.

Example:

let subject = ReplaySubject<Int>.create(bufferSize: 2)

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

subject.subscribe(onNext: { print("Subscribe to:\ [$0)") })
    .disposed(by: disposeBag)

subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
Copy the code
Print results: Subscribe to:2Subscribe to:3Subscribe to:4Subscribe to:5Subscribe to:6
Copy the code

Source code analysis: (1) initialize method create function

bufferSize
ReplayMany

queue

The onNext function calls the on function of ReplayBufferBase.

  • calladdValueToBufferThe function adds the sent element toqueueIn the.
  • calltrimThe function to deletequeueIn more thanbufferSizeThe superfluous element of.

(3) Subscribe function

observer
self._observers
self.replayBuffer(anyObserver)
ReplayManyBase.replayBuffer

override func replayBuffer<O: ObserverType>(_ observer: O) where O.E= =Element {
    for item in self._queue {
        observer.on(.next(item))
    }
}
Copy the code

Sends all the elements saved in the queue to the subscriber.

4, AsyncSubject

The AsyncSubject emits the last element (with and only the last element) after the source Observable has generated its completion time. If the source Observable emits no elements and has only one completion event, the AsyncSubject has only one completion event. If the source Observable aborts with an error event, the AsyncSubject emits no elements and sends the error event instead.

Example:

let subject = AsyncSubject<Int>()

subject.onNext(1)
subject.onNext(2)

subject.subscribe({ print("Subscribe to:\ [$0)")})
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
Copy the code
Print results: Subscribe to: next(4) Subscribed to: completedCopy the code

Source code analysis: Focusing on the on function leads to the following code at 👇, obtaining the saved observer.on closure, and executing the event callback.

  • innextEvent to save the latest element inself._lastElementAfter medium, not all are returnedobserver.onAnd sendnextEvent, instead, initializes an emptyobserver.onCollection and returncompletedEvents. So no observer will receive a signal in response to the event.
  • inerrorIn the event, it clears allself._observersAnd returns allobserver.onerrorEvent, so all observers will only receiveerrorThe signal.
  • incompletedIn the event. It determines if the latest element has been sent: if so, it is sent and executednextEvents.

    And it’s executingnextAfter the event is executedcompletedEvents. If there is no latest element, only allobserver.onsendcompletedEvents.

5. Variable (deprecated)

Example:

let subject = Variable.init(1)

subject.value = 10
subject.value = 100

subject.asObservable().subscribe({ print("Subscribe to:\ [$0)")})
    .disposed(by: disposeBag)

subject.value = 1000
Copy the code
Print result: ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvxSubscribe to: Next (100Subscribe to: next(1000) Subscribed to: completedCopy the code

Source code analysis:

Variable
ObserverType
Observable
_subject: BehaviorSubject<Element>
Variable
BehaviorSubject
ObserverType
on
on

  • When initializing, use the initialization value, initializeBehaviorSubjectAnd saved inself._subjectIn the.
  • rightvalueI made a layer of encapsulation invaluesetFunction, will call_subjectonFunction. The signal is sent.

The BehaviorRelay and BehaviorSubject are recommended as alternatives.

6, BehaviorRelay

BehaviorRelay is a BehaviorSubject that removes the termination events onError and onCompleted.

Example:

let subject = BehaviorRelay(value: 1)

subject.accept(10)

subject.subscribe({ print("Subscribe to:\ [$0)")})
    .disposed(by: disposeBag)

subject.accept(100)

subject.accept(1000)
Copy the code
Print results: Subscribe to: next(10Subscribe to: next(100Subscribe to: next(1000)
Copy the code

Source code analysis:

BehaviorRelay
BehaviorRelay
BehaviorSubject
BehaviorSubject
BehaviorRelay
error
completed

Why do YOU need a BehaviorRelay to encapsulate a BehaviorSubject when you already have one? Generally, if you need to know the current signal value of the BehaviorSubject, you can only obtain the signal from subscribe. However, using BehaviorRelay, you can easily obtain the current signal by using BehaviorRelay.

The above is some analysis of common subjects. If there is any deficiency, please comment and correct it.