RxSwift chapter

  • first
  • Sequential response logic
  • Sequence types
  • The operator
  • Scheduler – Scheduler (1)
  • Scheduler – Scheduler (2)
  • Common sequence
  • Subject is both offensive and defensive
  • MVVM bidirectional binding

The importance of Observable in Rxswift is self-evident, and this foundation must be solid.

1. empty

Features:

  • Empty sequence, only completion signal.
        Observable<String>.empty()
            .subscribe(onNext: { (info) in
                print(info)
            }, onError: { (err) in
                print(err)
            }, onCompleted: {
                print("Complete") {})print("Release callback")}Copy the code

Source code analysis

class EmptyProducer<Element>: Producer<Element> { override func subscribe<Observer: ObserverType>... { observer.on(.completed)return Disposables.create()
    }
}
Copy the code
  • When you subscribed, you sent the. Completed signal.

2. just

Features:

  • A single signal sequence.
  • Initialization takes one argument and automatically executes Complete after subscribing to the information.
let arr = ["1"."2"]
        Observable<[String]>.just(arr)
            .subscribe(onNext: { (info) in
                print(info)
            }, onError: { (err) in
                print(err)
            }, onCompleted: {
                print("Complete") {})print("Release callback")}Copy the code

Source code analysis

override func subscribe<Observer: ObserverType> ... { observer.on(.next(self._element)) observer.on(.completed)return Disposables.create()
    }
Copy the code
  • To subscribe, execute observer.on(.next(self._element)) and send the.next signal.
  • Finally, a COMPLETED signal is automatically sent.

3. of

Features:

  • Initialize a variable number of elements
  • Elements can accept a variable number of arguments (arguments of the same type)
        Observable<[String: String]>.of(["name": "l"."age": "20"], ["name": "2"."age": "22"])
            .subscribe { (event) in
            print(event)
        }
Copy the code

Source code analysis

final private class ObservableSequenceSink ... {
    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}
Copy the code
  • Source code implemented as a classical structure, subscription through “Sink”.
  • In “Sink”, recurse(mutableIterator) is used to send all parameters, and completed is automatically sent.

4. from

Features:

  • The parameters are selectable
  • Subscription can be selected when processing, more secure.
        Observable<[String]>.from(optional: ["111"."222"])
            .subscribe { (event) in
                print(event)
            }
Copy the code

Source code analysis

    override func subscribe<Observer: ObserverType>... {
        if let element = self._optional {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }
Copy the code
  • When subscribed, the next(element) signal is sent and the last. Completed signal is sent.

5. deferred

Features:

  • A custom sequence can be generated by passing a block with deferred.
  • When the requirement is dynamic sequence, the conditional parameters can be passed in from outside to realize the sequence satisfying different conditions.
        var flag = true
        Observable<Int>.deferred { () -> Observable<Int> in
            flag = !false
            if flag{
                return Observable.of(1, 3)
            }else{
                return Observable.of(2, 4)
            }
        }
        .subscribe { (event) in
            print(event)
        }
Copy the code

Source code analysis

func run() -> Disposable {
        do {
            let result = try self._observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return Disposables.create()
        }
    }
Copy the code
  • Self._observablefactory holds the block that generates the sequence.
  • The factory closure is executed when the subscription process reaches “sink”.
  • Result.subscribe (self) is a custom sequence that implements the subscription here.

6. rang

Features:

  • Generates a sequence of observable integers in the specified range.
Observable.range(start: 2, count: 5).subscribe { (event) in
            print(event)
        }
Copy the code

Source code analysis

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(0 as Observer.Element) { i, recurse in
            if i < self._parent._count {
                self.forwardOn(.next(self._parent._start + i))
                recurse(i + 1)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
Copy the code
  • After subscription, count signals are sent recursively according to the initial value, and the parameter changes are (the initial value increases successively).
  • The.completed signal is finally sent.

7. generate

Features:

  • The sequence of actions is given only if both of the provided criteria are true.

  • Parameter 1 initialState: indicates the initialState.

  • Condition: The condition that terminates the build (when “false” is returned).

  • Parameter 3 iterate: Iterated step function.

Observable. Generate (initialState: 0, condition: {$0 < 10},
                            iterate: { $0 + 2}
        ).subscribe { (event) in
            print(event)} //2. Convenience set according to conditional ruleslet generateArr = ["PL_1"."PL_2"."PL_3"."PL_4"."PL_5"]
        Observable.generate(initialState: 0,
                            condition: { $0 < generateArr.count },
                            iterate: { $0 + 1 })
            .subscribe(onNext: { (index) in
                print(generateArr[index])
            })
Copy the code

Source code analysis

// Class GenerateSink<Sequence, Observer: ObserverType>: Sink<Observer> { typealias Parent = Generate<Sequence, Observer.Element> privatelet _parent: Parent
    private var _state: Sequence
    ...
    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(true) { isFirst, recurse -> Void in
            do {
                if! isFirst { self._state = try self._parent._iterate(self._state) }if try self._parent._condition(self._state) {
                    let result = try self._parent._resultSelector(self._state)
                    self.forwardOn(.next(result))
                    recurse(false)}else {
                    self.forwardOn(.completed)
                    self.dispose()
                }
            }
            catch let error {
                self.forwardOn(.error(error))
                self.dispose()
            }
        }
    }
}
Copy the code
  • When you subscribe, iterate from the initial value self._state = try self._parent._iterate(self._state).
  • Execute self._parent._condition(self._state) to determine whether to send the. Next signal if the condition is met, or not to send the. Completed signal.

8. timer

Features:

  • A sequence of periodic responses with a value of the number of cycles.

  • Parameter 1: the time of the first response.

  • Parameter two period: indicates the interval.

  • Parameter three Scheduler: thread.

        Observable<Int>.timer(5, period: 2, scheduler: MainScheduler.instance)
        .subscribe { (event) in
            print(event)
        }
Copy the code
  • After subscribing, responses are sent at regular intervals.

9. interval

Features:

  • Same as timer.
Observable<Int>.interval(2, scheduler: MainScheduler.instance)
            .subscribe { (event) in
            print(event)
        }
Copy the code

Source code analysis

    public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
        return Timer(
            dueTime: period,
            period: period,
            scheduler: scheduler
        )
    }
Copy the code
  • The Timer sequence is called.

10. repeatElement

Features:

  • Repeat indefinitely.
        Observable.repeatElement("PL")
            .subscribe { (event) in
            print(event)
        }
Copy the code

Source code analysis

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._element) { e, recurse in
            self.forwardOn(.next(e))
            recurse(e)
        }
    }
Copy the code
  • After the subscription, the infinite recursive call signal is sent.

11. error

Features:

  • Only error signals are sent
          Observable<String>.error(NSError.init(domain: "error", code: 10, userInfo: ["reason": "unknow"]))
            .subscribe { (event) in
                print(event)
            }
Copy the code

Source code analysis

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.error(self._error))
        return Disposables.create()
    }
Copy the code
  • Send.error events directly in subscription code.

12. never

Features:

  • It never signals (and never terminates).

Source code analysis

 class NeverProducer<Element>: Producer<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        return Disposables.create()
    }
}
Copy the code
  • There are no events in the subscription code.

13. create

Features:

  • This method takes a parameter in the form of a closure, and its job is to respond to each incoming subscription.

  • This is also a common way to create sequences, and there are a lot of applications.

  • Examples:

        let observable = Observable<String>.create{observer in// A.next event was issued to the subscriber, carrying a data. observer.onnext ("test"Completed Event observer.oncompleted ()returnDisposables.create()} // Subscribe test Observable. subscribe {print($0)
        }
        observable.subscribe {
            print($0)}Copy the code