RxSwift chapter

  • first
  • Sequential response logic
  • Sequence types
  • The operator
  • Scheduler – Scheduler (1)
  • Scheduler – Scheduler (2)
  • Common sequence
  • Subject is both offensive and defensive
  • MVVM bidirectional binding

Look at a very special type -Subject. Why is it special? The reason is simple: **Subject can be both a sequence and an observer! Because of this feature, ** is widely used in actual development. Let’s interpret this particular Subject

The principle of

SubjectType

/// Represents an object that is both an observable sequence as well as an observer.

public protocol SubjectType : ObservableType {
    /// 
    associatedtype Observer: ObserverType
    /// 
    func asObserver() -> Observer
}
Copy the code
  • SubjectType first inherits ObservableType and has sequence properties.

  • Associated with the observer type, has the ability of that type.

  • Let’s get a feel for the Subject through a concrete type.

// Create sequenceletPublishSub = PublishSubject<Int>() // send response publishsub.onnext (1) // subscribe sequence publishsub.subscribe {print("Subscribed :".$0Prompt (by: Disposbag) // Send response publishsub.onnext (2) publishsub.onnext (3)Copy the code
  • The ability to subscribe to signals and send responses is obvious.

  • View the underlying source code analysis

Subscribe to the process

   public override func subscribe(_ observer: Observer)... -> Disposable 
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe(_ observer: Observer)... -> Disposable  {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
Copy the code
  • Self._observer. insert(observer.on): Add all the subscribed events in one set, and it is obvious that all responses can be handled.

Signal sending process

    public func on(_ event: Event<Element>) {
    ...
        dispatch(self._synchronized_on(event), event)
    }
Copy the code
  • The dispatch function is called primarily, passing in two parameters: self._synchronized_on(event) and event
  • Analysis of the self. _synchronized_on (event)
    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
Copy the code
  • Here if self _isDisposed | | self. _stopped form will return an empty collection, there would be no sequence response

  • Both.completed and.error change the state self._stopped = true, which means that the sequence can no longer respond after completion or error

  • On.completed,.error also removes content added to the collection

  • Focusing on returning observers based on event state, observers continue with the Dispatch function analysis

func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) { bag._value0? (event)if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}
Copy the code
  • bag._value0? (event) The callback to the event is performed first

  • Check bag._onlyFastPath, default is fast channel!

  • If the slow channel is enabled, external event callbacks are processed in sequence from pairs and Dictionaries.

summary

Relative to ordinary sequences,subject implements both the subscription process and the response process internally, so there is no need to introduce sink!

The Subject classification

PublishSubject

Initialization is simple (with no default values), and it only sends elements to the subscriber that were received after the subscription.

        letPublishSub = PublishSubject<Int>() // send publishsub.onnext (1) // subscribe publishsub.subscribe {print("Subscribed :".$0Prompt (by: Disposbag) // Send response publishsub.onnext (2) publishsub.onnext (3)Copy the code
  • Signal :1 cannot be subscribed, only the response after the subscription is accepted.

BehaviorSubject

This is created with a default initialization value. When a subscriber subscrires to a BehaviorSubject, it receives a BehaviorSubject Event emitted after the subscription, or a default value emitted if no data has been received. The new event is then received as normal as the PublishSubject.

Unlike publish, behavior has a storage function: it stores the last signal.

        letbehaviorSub = BehaviorSubject.init(value: 100) // 2: Signal behaviorSub.onNext(2) behaviorSub.onNext(3) // 3: Subscribe sequence behaviorSub.subscribe{print("Subscribed :".$0Prompt (by: Disposbag) // Send behaviorSub.onNext(4) behaviorSub.onNext(5) // Subscribe again behaviorSub.subscribe{print("Subscribed :".$0)}
            .disposed(by: disposbag)

Copy the code

Source code analysis

Public init(value: Element) {self._element = value} Event<Element>) -> Observers { self._lock.lock(); defer { self._lock.unlock() }ifself._stoppedEvent ! = nil || self._isDisposed {return Observers()
        }
        
        switch event {
        case .next(let element):
            self._element = element
        case .error, .completed:
            self._stoppedEvent = event
        }
        
        return self._observers
    }
Copy the code
  • Class with a property self._element that holds a signal

  • Event response: The new event overwrites the original event

ReplaySubject

Compared to the BehaviorSubject, only the storage capability of the ReplaySubject is different.

        letreplaySub = ReplaySubject<Int>.create(bufferSize: OnNext (1) replaysub. onNext(2) replaysub. onNext(3) replaysub. onNext(4) // 3: Subscribe sequence replaySub.subscribe{print("Subscribed :".$0Prompt (by: Disposbag) // Replaysub.onnext (7) replaysub.onnext (8) replaysub.onnext (9)Copy the code

AsyncSubject

The AsyncSubject sends only the last event sent by the source Observable, and only after the source Observable has completed. (If the source Observable does not send any value, the AsyncSubject does not send any value either.)

        letAsynSub = AsyncSubject<Int>.init() // 2: send signal asynsub.onNext (1) asynsub.onnext (2) // 3: subscribe sequence asynsub.subscribe {print("Subscribed :".$0Prompt (by: Disposbag) // Send asynsub.onnext (3) asynsub.onnext (4) asynsub.subscribe {print("Subscribed :".$0)
                
        }
        .disposed(by: disposbag)
        asynSub.onCompleted()
Copy the code
  • Source code response processing
    func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            self._lastElement = element
            return (Observers(), .completed)
        case .error:
            self._stoppedEvent = event

            let observers = self._observers
            self._observers.removeAll()

            return (observers, event)
        case .completed:

            let observers = self._observers
            self._observers.removeAll()

            if let lastElement = self._lastElement {
                self._stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                self._stoppedEvent = event
                return (observers, .completed)
            }
        }
    }
Copy the code
  • As you can clearly see, the normal Next event is an element replacement, with no response at all. When the complete event is sent, the last saved self._lastElement is passed as the event value, and the response is: Next (lastElement)

  • Send the completion event:.completed if no event is saved

  • Error events empty the entire response set: self._observers.removeall ()

BehaviorRelay

  • You can store a signal

  • Anytime subscribe response

  • Note that the response is sent with.accept().

        let behaviorRelay = BehaviorRelay(value: 100)
        behaviorRelay.subscribe { (result) in
            print(result)
        }
        .disposed(by: disposbag)
        print(Print: \ "(behaviorRelay. Value)")
        behaviorRelay.accept(1000)
Copy the code

The source code

public final class BehaviorRelay<Element>: ObservableType {
    private let _subject: BehaviorSubject<Element>

    /// Accepts `event` and emits it to subscribers
    public func accept(_ event: Element) {
        self._subject.onNext(event)
    }

    /// Initializes 
    public init(value: Element) {
        self._subject = BehaviorSubject(value: value)
    }
   }
Copy the code
  • The BehaviorSubject is encapsulated internally, which is pretty obvious.