Recently, I am studying the content related to RxSwift. I will record some basic knowledge points here for future reference.

Observable

In RxSwift, the most critical concept is Observable Sequence, which is equivalent to the Sequence in Swift. Every element in the Observable Sequence is an event. As we know, the Sequence of Swift can contain any number of elements. An observable sequence will continue to generate new events until an error occurs or a normal end occurs. A subscriber subscribes to an observable queue to receive new events generated by the sequence, which can only send events if an Observer is present.

For example, use the of operation to create an observable sequence:

let seq = Observable.of(1.2.3) 
Copy the code

Of is a simple operation to create an Observable. The above code creates an Observable of type Observable

, which contains three elements: 1, 2, and 3.

ObservableType ObservableType is a type that implements the ObservableType protocol. The ObservableType protocol is defined very simply:

protocol ObservableType : ObservableConvertibleType {
    associatedtype E
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E= =E
}
Copy the code

Where E is an association type representing the type of the element in the sequence, the protocol defines only one method: SUBSCRIBE, which adds an observer (type ObserverType) to the observable sequence:

Subscribe to a closure is a convenient way to extend the protocol
seq.subscribe { (event) in
    print(event)
}
Copy the code

Subscribe is equivalent to the makeIterator in the Swift sequence, as shown above, adding an observer to the seq sequence, calling the closure whenever there is a new event in the sequence, and the code above will print 1, 2, and 3.

Observer

An observer is an object that implements the ObserverType protocol, which is also quite simple:

public protocol ObserverType {
    associatedtype E
    func on(_ event: Event<E>)
}
Copy the code

E is the element type in the sequence observed by the observer. When a new event occurs in the sequence, the on method is called to receive the new event. The Event type is an enumeration containing three types:

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
Copy the code
  1. .next: indicates that the next event occurred in the sequence, and the associated value Element holds the value of that event.
  2. .errorThe sequence generates an Error, the associated value Error holds the type of Error, after which the sequence terminates (no new next events are generated).
  3. .completed: The sequence ends normally.

Dispose

In addition to generating errors and natural terminations, you can also terminate an observation manually by returning an object of type Disposable when you subscribe to an observable sequence. Here Disposable is a protocol that defines only one method:

protocol Disposable {
    func dispose(a)
}
Copy the code

The Dispose method is used to terminate the subscription and release the relevant resources in the observable sequence. Normally you do not call this method directly, but add the Disposable to a DisposeBag object by calling its extension addDisposableTo. DisposeBag automatically manages all Disposable objects added to it, and when DisposeBag is destroyed, it automatically calls the Dispose method of all Disposable objects to release resources.

You can also use takeUntil to automatically end a subscription:

seq.takeUntil(otherSeq)
	.subscribe({ (event) in
    	print(event)
	})
Copy the code

The subscription is terminated automatically after the otherSeq sequence emits an event of any type.

Create a sequence

Observable creates a custom Observable sequence using the create method provided by the Observable type:

let seq = Observable<Int>.create { (observer) -> Disposable in
    observer.on(.next(1))
    observer.on(.completed)
    return Disposables.create {
        // do some cleanup}}Copy the code

The create method creates a custom sequence using a closure that takes an ObserverType parameter, Observer, and sends the corresponding event through observer. The code above creates an Observable sequence of type Observable

that receives event 1 and a completion event. Finally, the Create method returns a self-created Disposable object, where you can do some recycling.

In addition to the create method, RxSwift provides a number of simple methods for creating sequences. Common ones are:

  • Just: Creates an observable sequence that contains only one value:

    let justSeq = Observable.just(1)
    justSeq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    completed
    Copy the code
  • Of: is similar to just, except that of creates an event queue of elements that sends events and terminations in turn:

    let ofSeq = Observable.of(1.2.3)
    ofSeq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    next(3)
    completed
    Copy the code
  • Empty: This type of Observable sends only a Completed event

    let emptySequence = Observable<String>.empty()
    Copy the code
  • Error: This queue sends only one error event, passing a custom error type.

    let errorSeq = Observable<TestError>.error(TestError.Error1)
    Copy the code

Share

Usually when we subscribe to an observable sequence, each subscription behavior is independent, that is:

let seq = Observable.of(1.2)
/ / 1
seq.subscribe { (event) in
    print("sub 1: \(event)")}/ / 2
seq.subscribe { (event) in
    print("sub 2: \(event)")
}
---- example output ----
sub 1: next(1)
sub 1: next(2)
sub 1: completed 
sub 2: next(1)
sub 2: next(2)
sub 2: completed 
Copy the code

We subscribe to the same sequence twice in a row, receiving the same event each time, and the second time we subscribe without “running out” of elements because of the behavior of the first time. You can use share when you want all observers to share an event

  • Share: Share is an extension of the ObservableType protocol that returns an observable sequence in which all observers share the same subscription. Add share to the code above:

    let seq = Observable.of(1.2).share()
    / / 1
    seq.subscribe { (event) in
        print("sub 1: \(event)")}/ / 2
    seq.subscribe { (event) in
        print("sub 2: \(event)")
    }
    ---- example output ----
    sub 1: next(1)
    sub 1: next(2)
    sub 1: completed 
    sub 2: completed 
    Copy the code

    As you can see, the sequence sent all the events on the first subscription and received only one completion event on the second subscription.

  • ShareReplay: shareReplay is used similarly to Share, and its methods are signed as follows:

    func shareReplay(_ bufferSize: Int) -> Observable<Element>
    Copy the code

    The difference is that shareReplay takes an integer parameter bufferSize, specifying the bufferSize, and observers subscribed to the sequence immediately receive the latest bufferSize bar.

Sequence transformations and combinations

In the Sequence Sequence of Swift, common functional methods such as Map, flatMap and Reduce can be used to transform elements, and observable sequences in RxSwift also support these methods.

transform

  • Map: This is the signature of the map method:

    func map<Result>(_ transform: @escaping (E) throws -> Result) - >Observable<Result>
    Copy the code

    Transforms each element of the Sequence in a custom closure that returns an observable Sequence containing the transformed result, similar to the map of the Sequence in Swift.

    let mappedSeq: Observable<String> = seq.map { (element) -> String in
    	return "value: \(element)"
    }
    Copy the code
  • FlatMap: Let’s look at the signature of the flatMap:

    func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O) - >Observable<O.E> 
    Copy the code

    The function of flatMap can also be compared to Sequence. The flatMap closure in Sequence iterates through each element and returns a new Sequence. Finally, these sequences will be “flattened” to obtain a new Sequence containing all Sequence elements:

    let array = [1.2]
    let res = array.flatMap { (n) -> [String] in
        return ["\(n)a"."\(n)b"]}// res: ["1a", "1b", "2a", "2b"]
    Copy the code

    The flatMap in RxSwift is used similarly, the closure in the flatMap iterates through all the elements in the observable sequence and returns a new observable sequence, and finally the flatMap returns an observable sequence containing all the elements:

    let seq = Observable.of(1.2)
        .flatMap { (n) -> Observable<String> in
            return Observable.of("\(n)a"."\(n)b") / / (1)
        }
        .subscribe { (event) in
            print(event)
        }
    Observable
            
    ---- example output ----
    next(1a)
    next(1b)
    next(2a)
    next(2b)
    completed
    Copy the code

    Several observable sequences are created in the closure (1). Any next event sent in these sequences is passed to the SEQ sequence. When an error occurs in any of these sequences (sending an error event), the SEQ sequence ends and does not receive any more events. However, the SEQ sequence does not end properly until all sequences are complete (a COMPLETED event is sent).

  • FlatMapLatest: It is similar to flatMap, but for Observables generated in the closure, it does not keep all the subscriptions of the sequence. After traversal, only the subscriptions of the last created Observables will be kept. All Observables created before will be unsubscribed (dispose method of corresponding sequence will also be called) :

    // Just change the flatMap to flatMapLatest with the same code as in the previous example
    let seq = Observable.of(1.2)
        .flatMapLatest { (n) -> Observable<String> in
            return Observable.of("\(n)a"."\(n)b") / / (1)
        }
        .subscribe { (event) in
            print(event)
        }
    ---- example output ----
    next(1a)
    next(2a)
    next(2b)
    completed
    Copy the code

    Because of the subscription changes, SEQ now receives a Completed event only when the last Observable created ends properly.

    In this case, flatMapLatest gets the same output as flatMap:

    let seq = Observable.of(1.2)
        .flatMapLatest { (n) -> Observable<String> in
            return Observable<String>.create({ (observer) -> Disposable in
                observer.onNext("\(n)a")
                observer.onNext("\(n)b")
    
                return Disposables.create { }
            })
        }
        .subscribe { (event) in
            print(event)
        }
    Copy the code

    This is because the Observable created in the above example creates elements synchronously and cannot be interrupted.

    A similar method is flatMapFirst, which is analogous to flatMapLatest.

  • Reduce and SCAN: Reduce works as defined in Sequence. It receives an initial value and a closure, calls the closure on each Observable value, and takes the results of each step as input to the next call:

    Observable.of(1.2.3).reduce(0) { (first, num) -> Float in
            return Float(first + num)
        }
        .subscribe { (event) in
            print(event)
        }
    Output: next(6.0), completed
    Copy the code

    In the above code, we provide an initial value of 0, evaluate and in the closure, change the element type of the resulting sequence to Float, and the observer of the sequence finally receives the sum of all elements.

    Scan is similar to Reduce. The only difference between Scan and Reduce is that scan sends the result of each closure invocation:

    Observable.of(1.2.3).scan(0) { (first, num) -> Float in
            return Float(first + num)
        }
        .subscribe { (event) in
            print(event)
        }
    Output: next(1.0), next(3.0), next(6.0), completed
    Copy the code

combination

  • StartWith: Adds a specified element to the beginning of a sequence

    Observable.of(2.3).startWith(1).subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    next(3)
    completed
    Copy the code

    Once you subscribe to the sequence, you receive the events specified by startWith immediately, even if the sequence does not start sending events at this point.

  • Merge: When you have multiple Observables of the same type, merge them and subscribe to all observables at the same time:

    let seq1 = Observable.just(1)
    let seq2 = Observable.just(2)
    let seq = Observable.of(seq1, seq2).merge()
    seq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    completed
    Copy the code

    Merge can only be used when an Observable element is an Observable. Seq terminates when an error occurs in one of its sequences. Seq receives completion events only when all of its sequences are complete.

  • Zip: The zip method can also merge multiple Observables together. Unlike merge, zip provides a closure for combining elements in multiple Observables, resulting in a new sequence:

    let seq1 = Observable.just(1)
    let seq2 = Observable.just(2)
    let seq: Observable<String> = Observable.zip(seq1, seq2) { (num1, num2) -> String in
        return "\(num1 + num2)"
    }
    seq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(3)
    completed
    Copy the code

    The zip method has multiple versions depending on the number of arguments, and supports merging up to eight observable sequences. It is important to note that the arguments received by the closure are elements at the corresponding positions in each sequence. That is, if SEQ1 sends an event and SEQ2 sends multiple events, the closure will only be executed once, with only one element in SEQ.

    When any one of the combined Observables fails, the final SEQ terminates directly. Seq ends when all observables issue a Completed event.

  • CombineLatest: CombineLatest is also used to combine multiple sequences into a single sequence in the same way as ZIP, but with a different invocation mechanism. Each time a sequence has a new element, combineLatest takes the last element from all the other sequences and passes in a closure to generate a new element to add to the resulting sequence.

Subject

A Subject object acts as an intermediate proxy and bridge, acting as both an observer and an observable sequence, through which you can send events after adding observers to a Subject object. The Subject object does not actively send a completed event, and after an error or a Completed event is sent, the sequence in the Subject terminates and no new messages can be sent. Subject also falls into several types:

  • PublishSubject: Subscribers to a PublishSubject receive only events sent after they subscribe

    let subject = PublishSubject<Int>()
    subject.onNext(1)
    subject.subscribe { (event) in
        print(event)
    }
    subject.onNext(2)
    
    ---- example output ----
    next(2)
    Copy the code

    As you can see, the observer only received event 2, and the event 1 sent before the subscription was not received.

  • ReplaySubject: When ReplaySubject is initialized, it specifies a buffer of size N in which the last n events are stored. After the subscription, the observer receives the events in the buffer immediately:

    let subject = ReplaySubject<Int>.create(bufferSize: 2)
    subject.onNext(1)
    subject.subscribe { (event) in
        print(event)
    }
    subject.onNext(2)
    
    ---- example output ----
    next(1)
    next(2)
    Copy the code
  • The BehaviorSubject of the BehaviorSubject should provide a default value when initializing the BehaviorSubject. At the time of subscription, the observer will immediately receive the event that was last sent in the sequence. If no event was sent, the BehaviorSubject will receive the default value:

    let subject = BehaviorSubject(value: 1)
    subject.subscribe { (event) in
        print(event)
    }
    
    ---- example output ----
    next(1)
    Copy the code
  • Variable: Variable encapsulates BehaviorSubject, which is similar to BehaviorSubject. Variable does not have a method like on to send events. Instead, it has a value property. Assigning to value sends the next event to the observer, and accessing value retrieves the last data sent:

    let variable = Variable(1)
    variable.asObservable().subscribe { (event) in
        print(event)
    }
    variable.value = 2
    
    ---- example output ----
    next(1)
    next(2)
    completed
    Copy the code

    Unlike other Subject types, Variable sends a Completed event when it is released, and Variable objects never send error events.

Scheduler

Scheduler is a method of multithreaded programming in RxSwift. An Observable executes on a Scheduler that determines which thread will operate on the sequence and call back events. By default, after subscribing to an Observable, observers are notified and disposed on the same thread as when they called the Subscribe method.

Scheduler, like GCD, is classified as serial and concurrent, with several Schedular defined in RxSwift:

  • CurrentThreadScheduler: This is the default Scheduler and represents the current thread, of serial type.
  • MainScheduler: indicates the main thread. The type is serial
  • SerialDispatchQueueScheduler: provide some quick way to create a serial Scheduler, internal encapsulates DispatchQueue
  • ConcurrentDispatchQueueScheduler: provides a quick way to create a parallel Scheduler, encapsulates the same DispatchQueue

SubscribeOn and observeOn

SubscribeOn and observeOn are the two most important methods that change the Scheduler of an Observable:

// main thread
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
let seq = Observable.of(1.2)
seq.subscribeOn(scheduler)
    .map {
        return $0 * 2 / / the child thread
    }
    .subscribe { (event) in
        print(event) / / the child thread
    }
Copy the code

In the code above, we create a concurrent Scheduler and specify the Scheduler by calling subscribeOn on the sequence SEq. As you can see, we subscribe to the sequence in the main thread, but the map methods and the event callbacks are executed in the created child thread.

SubscribeOn and observeOn can both specify the Scheduler of the sequence. The difference between them is:

  • subscribeOnSet the Scheduler on which the entire sequence starts, where sequence creation and subsequent operations occur.subscribeOnIt can only be called once in the entire chain call and then againsubscribeOnIt had no effect.
  • observeOnSpecify a Scheduler to which all subsequent operations will be dispatched,observeOnScheduler can be changed in the middle of a chain operation
createObservable().
	.doSomething()
	.subscribeOn(scheduler1) / / (1)
	.doSomethingElse()
	.observeOn(scheduler2) / / (2)
	.doAnother()
	...
Copy the code

After subscribeOn is executed at (1), createObservable() and doSomething() are executed in Scheduler1, as well as doSomethingElse(). Another scheduler is then specified with observeOn, and doAnother() is then executed on scheduler2.

Add Rx extensions to the original code

RxSwift provides an extension mechanism that makes it easy to add Rx extensions to existing code. Reactive:

public struct Reactive<Base> {
    /// base is the extended object instance
    public let base: Base
	
    public init(_ base: Base) {
        self.base = base
    }
}
Copy the code

Reactive is a generic structure that defines a single property, base, and passes in the value of that property when the structure is initialized.

In addition, a protocol ReactiveCompatible is defined:

public protocol ReactiveCompatible {
    associatedtype CompatibleType

    static var rx: Reactive<CompatibleType>.Type { get set }
    var rx: Reactive<CompatibleType> { get set}}Copy the code

The protocol defines an attribute with the same name for the class object and an instance object, rx, of type Reactive, and extends the protocol to provide a default implementation of GET.

extension ReactiveCompatible {
    public static var rx: Reactive<Self>.Type {
        get {
            return Reactive<Self>.self
        }
        set {
            // this enables using Reactive to "mutate" base type}}public var rx: Reactive<Self> {
        get {
            return Reactive(self)}set {
            // this enables using Reactive to "mutate" base object}}}Copy the code

The association type CompatibleType is automatically derived to the class itself that implements the protocol, using self to initialize a Reactive object.

Finally, the ReactiveCompatible protocol is implemented for all NSObject types by extending the protocol:

extension NSObject: ReactiveCompatible {}Copy the code

This way, all types in the code that inherit from NSObject will have an attribute of type Rx Reactive. When we add an rX extension to our own type, we simply add methods to Reactive by extending them, such as UIButton:

extension Reactive where Base: UIButton { // Add an extension to Reactive
    public var tap: ControlEvent<Void> {
        return controlEvent(.touchUpInside) // The instance itself can be accessed through base}}Copy the code

Because Reactive is a generic type, we can specify the type of the generic through the WHERE statement. In this way, we can access the tap property in the Rx of UIButton instance:

let button = UIButton(...). button.rx.tapCopy the code

RxSwift extension libraries like RxCocoa are extended in this way.