In order to facilitate our daily use, RxSwift encapsulates some commonly used special sequences based on Observable, such as Driver, Single, Completable, Maybe, etc. Below, we will take a look at its use and principle.

Driver

Think of it as the builder pattern that drives the observable sequence of your application. The main purpose is to simplify the CODE at the UI layer.

Let’s look at an example, we input a line of text in the input box, start to fetch data from the server, return the data, refresh the page mLabel and mButton above the text:

private func httpTest() { let res = mTextField.rx.text.skip(1) .flatMap { (input) -> Observable<String> in return self.fetchData(inputText: input ?? "") } res.bind(to: mLabel.rx.text) .disposed(by: disposeBag) res.bind(to: mButton.rx.title()) .disposed(by: disposeBag) } private func fetchData(inputText: String) -> Observable<String> {print("-- Thread before network request --\(thread.current)--") return Observable<String>. Create {(observer) -> Disposable dispatchqueue.global (). Async {print("-- Thread in network request --\(thread.current)--") OnNext (" Requested data back ") observer.oncompleted ()} return Disposables. Create (); }}Copy the code

The running results are as follows:

You can see that the above code is actually problematic:

  • iffetchDataThe sequence generated an error (network request failure) that would cancel all bindings and prevent the user from making a new network request when entering a new keyword.
  • The returned result is bound to twoUIElement. That means that every time a user enters a new keyword, there will be twoUIElements byHTTPThe request. That’s why the console prints twice.
  • iffetchDataIf the sequence is returned in the background, the page refresh will also occur in the background, causing an exception crash.

We can optimize the above code:

let res = mTextField.rx.text.skip(1) .flatMap { (input) -> Observable<String> in return self.fetchData(inputText: input ?? "").observeon (mainscheduler.instance).catchErrorJustreturn (" network request error ")}.share(replay: 1, scope:.whileconnected)Copy the code
  • observeOn(MainScheduler.instance): returns the result on the main thread
  • catchErrorJustReturn(): Handles network error exceptions
  • share(): Shared network request, preventing multiple calls

We can use Drive optimization directly:

private func driveTest() {
    let res = mTextField.rx.text.asDriver()
        .flatMap { input in
            self.fetchData(inputText: input ?? "")
             .asDriver(onErrorJustReturn: "网络请求错误")
    }

    res.drive(mLabel.rx.text)
    .disposed(by: disposeBag)
        
    res.drive(mButton.rx.title())
    .disposed(by: disposeBag)
}
Copy the code

All we need to do is convert the normal observation sequence to Drive, which is also used for UI binding, and add a fault tolerance.

Let’s have a look at the source code:

public typealias Driver<E> = SharedSequence<DriverSharingStrategy, E>
Copy the code

Driver is an alias of the sequence SharedSequence. It has the following characteristics:

  • Will not fail
  • Pass events on the main thread
  • The sharing strategy isshare(replay: 1, scope: .whileConnected)In other words, all observers share the computational resources of the sequence

In the example above, when we call asDriver() we call the following method:

extension ControlProperty {
    public func asDriver() -> Driver<E> {
        return self.asDriver { _ -> Driver<E> in
            return Driver.empty()
        }
    }
}
Copy the code

Follow up:

public func asDriver(onErrorRecover: @escaping (_ error: Swift.error) -> Driver<E>) -> Driver<E> {let source = self.asObservable () // main thread .observeOn(DriverSharingStrategy.scheduler) .catchError { error in onErrorRecover(error).asObservable() } return Driver(source) } public static var scheduler: SchedulerType {return sharingScheduler.make ()} public private(set) static var make: () -> SchedulerType = { MainScheduler() }Copy the code

So you can see here that when observeOn specifies the scheduler it will specify it in the main thread. CatchError is the processing of error messages. Finally, the Driver(source) method, which is the alias of the SharedSequence sequence, is returned, which is the initialization method of the SharedSequence.

public struct SharedSequence<S: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
    public typealias E = Element
    public typealias SharingStrategy = S

    let _source: Observable<E>

    init(_ source: Observable<E>) {
        self._source = S.share(source)
    }

    .....

    public func asObservable() -> Observable<E> {
        return self._source
    }

    public func asSharedSequence() -> SharedSequence<SharingStrategy, E> {
        return self
    }
}
Copy the code

Its initialization method calls DriverSharingStrategy’s share method:

public struct DriverSharingStrategy: SharingStrategyProtocol {
    
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    
    public static func share<E>(_ source: Observable<E>) -> Observable<E> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}
Copy the code

As you can see, the share(replay: 1, scope:.whileconnected) sharing policy is eventually called. That is, calling the asDriver() method confirms that the main thread is subscribed to and the shared policy is determined.

Let’s look at what Driver

(SharedSequence) does by calling the Drive method.

public func drive<O: ObserverType>(_ observer: O) -> Disposable where O.E == E? {/ / determine whether in the main thread MainScheduler. EnsureRunningOnMainThread (errorMessage: errorMessage) return self.asSharedSequence().asObservable().map { $0 as E? }.subscribe(observer) } public class func ensureRunningOnMainThread(errorMessage: String? = nil) { guard Thread.isMainThread else { rxFatalError(errorMessage ?? "Running on background thread.") } }Copy the code

First check whether the operation is in the main thread, otherwise an error is reported. The map function is then used to map and adjust the data type, followed by calling the subscribe() method. It’s basically a layer of encapsulation.

Single

A sequence containing only one element emits either an element or an error event. Resources cannot be shared.

public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
Copy the code

Single is the alias PrimitiveSequence. It defines two enumerations of events:

Public enum SingleEvent<Element> {/// Only one Element is emitted. Next (Element) and. Completed ') case success(Element) /// Error (error)) case error(swift.error)}Copy the code

We can create a Single sequence using the following method:

func createSingleTest() { Single<String>.create { (sob) -> Disposable in if showSuccess { Else {SOB (.success(" This is a successful Single event "))} else {SOB (.error(nseror.init (domain: "error signle event ", code: 10000, userInfo: nil))) } return Disposables.create() }.subscribe { (element) in print("==\(element)==") }.disposed(by: disposeBag) }Copy the code

Console output:

// Success ==success(" This is a successful Single event ")== error(error Domain= error signle event Code=10000 "(null)")==Copy the code

The source code is as follows:

public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
   let source = Observable<Element>.create { observer in
       return subscribe { event in
           switch event {
           case .success(let element):
               observer.on(.next(element))
               observer.on(.completed)
           case .error(let error):
               observer.on(.error(error))
           }
       }
   }
   
   return PrimitiveSequence(raw: source)
}
Copy the code

It can be seen that Single is actually the encapsulation of Observable. There are only two kinds of responses, success and error. After success response, an event is normally emitted and completed.

Let’s look at the subscription method for Single:

public func subscribe(_ observer: @escaping (SingleEvent<Element>) -> Void) -> Disposable {
    var stopped = false
    return self.primitiveSequence.asObservable().subscribe { event in
        if stopped { return }
        stopped = true
        
        switch event {
        case .next(let element):
            observer(.success(element))
        case .error(let error):
            observer(.error(error))
        case .completed:
            rxFatalErrorInDebug("Singles can't emit a completion event")
        }
    }
}
Copy the code

The subscribe method of the called Observable does a layer of processing on the result. If it’s a next event, it converts it to SUCCESS. If the error event is normally sent, good processing; If the event is completed, an error is reported. That is, Single has no separate COMPLETED event.

Alternatively, we can create a Single sequence using the following method

func asSingleTest() {
    Observable.of("1").asSingle()
        .subscribe(onSuccess: { (element) in
            print("==\(element)==")
        }).disposed(by: disposeBag)
}
Copy the code

Console output:

= = = = 1Copy the code

If you change the element to multiple, the console will report an error:

Unhandled error happened: Sequence contains more than one element.
 subscription called from:
Copy the code

Let’s look at the source code again:

public func asSingle() -> Single<Element> { return PrimitiveSequence(raw: AsSingle(source: self.asObservable())) } final class AsSingle<Element>: Producer<Element> { fileprivate let _source: Observable<Element> init(source: Observable<Element>) { self._source = source } override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = AsSingleSink(observer: observer, cancel: cancel) let subscription = self._source.subscribe(sink) return (sink: sink, subscription: subscription) } } fileprivate final class AsSingleSink<Observer: ObserverType> : Sink<Observer>, ObserverType { typealias Element = Observer.Element private var _element: Event<Element>? func on(_ event: Event<Element>) { switch event { case .next: // The first time the event is sent is _element==nil if it is not equal to multiple elements in the observation sequence if self._element! = nil { self.forwardOn(.error(RxError.moreThanOneElement)) self.dispose() } self._element = event case .error: self.forwardOn(event) self.dispose() case .completed: If let element = self._element {self.forwardon (element) self.forwardon (.completed)} else { self.forwardOn(.error(RxError.noElements)) } self.dispose() } } }Copy the code

A Single encapsulates an Observable into a Single element or sequence of error events.

Completable

The Completable is a sequence that contains no elements, that is, zero elements, so it can only generate one completed event or one error event. Inability to share resources.

The definition is as follows:

public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>
Copy the code

It is also an alias for PrimitiveSequence, but the parameters passed in are different. Completable specifies two types of events:

Public enum CompletableEvent {/// CompletableEvent.Error (error) case error(swift.error) /// Case completed}Copy the code

We can create the Completable sequence using the following method:

func createCompletableTest() { var showCompleted = true Completable.create { (cob) -> Disposable in if showCompleted { Cob (.completed)} else {cob(.error(nseror.init (domain: "error signle signal ", code: 10000, userInfo: nil))) } return Disposables.create() }.subscribe(onCompleted: {print (" complete = = = = ")}) {(error) in print (" = = \ (error) = = ")}. The disposed (by: disposeBag)}Copy the code

Console output:

//. Completed == completed == //. Error ==Error Domain= Error signal Code=10000 "(NULL)"==Copy the code

The source code is as follows:

public static func create(subscribe: @escaping (@escaping CompletableObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
    let source = Observable<Element>.create { observer in
        return subscribe { event in
            switch event {
            case .error(let error):
                observer.on(.error(error))
            case .completed:
                observer.on(.completed)
            }
        }
    }
        
    return PrimitiveSequence(raw: source)
}
Copy the code

As you can see, Completable is similar to Single. There are only two types of responses: completed and error. Completed is a completion event and error is an error event.

Let’s look at the subscribe method for the Completable again:

public func subscribe(onCompleted: (() -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
    return self.primitiveSequence.subscribe { event in
        switch event {
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        case .completed:
            onCompleted?()
        }
    }
}
Copy the code

If.completed, the completed closure is called back directly, and if.error, onError is executed to send the error event.

We can also use the asCompletable() method:

func asCompletableTest() { Observable<Never>.empty() .asCompletable() .subscribe(onCompleted: {print (" complete = = = = ")}) {(error) in print (" = = \ (error) = = ")}. The disposed (by: disposeBag)}Copy the code

Console output:

= = = =Copy the code

The source code is as follows:

public func asCompletable()
    -> Completable {
    return PrimitiveSequence(raw: self.asObservable())
}
Copy the code

Maybe

The Maybe sequence can only do one thing, that is, emit one event, and it can either emit one element, a completed event, or an error event.

The definition is as follows:

public typealias Maybe<Element> = PrimitiveSequence<MaybeTrait, Element>
Copy the code

It is also an alias for PrimitiveSequence, but the parameters passed in are different. Maybe identifies three types of events:

Public enum MaybeEvent<Element> {/// Send an Element and complete the event. Next (Element),. Completed Case Success (Element) /// Error event .error(error) case error(swift.error) /// Case completed}Copy the code

We can create the Maybe sequence using the following method:

func createMaybeTest() { Maybe<String>.create(subscribe: {(mob) -> Disposable in mob(.success(" successful event ")) mob(.completed) mob(.error(nseror.init (domain: "error event ", code: 10000, userInfo: nil))) return Disposables.create() }).subscribe({ (element) in print("==\(element)==") }).disposed(by: disposeBag) }Copy the code

Console output:

==success(" success event ")==Copy the code

The source code is as follows:

public static func create(subscribe: @escaping (@escaping MaybeObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
    let source = Observable<Element>.create { observer in
        return subscribe { event in
            switch event {
            case .success(let element):
                observer.on(.next(element))
                observer.on(.completed)
            case .error(let error):
                observer.on(.error(error))
            case .completed:
                observer.on(.completed)
            }
        }
    }
    
    return PrimitiveSequence(raw: source)
}
Copy the code

Maybe is similar to Single, but has three responses: success, completed, and error. Success is to send an element and completed. Completed is to send a completed event.

Let’s look at Maybe’s subscribe method again:

public func subscribe(_ observer: @escaping (MaybeEvent<Element>) -> Void) -> Disposable {
    var stopped = false
    return self.primitiveSequence.asObservable().subscribe { event in
        if stopped { return }
        stopped = true
        
        switch event {
        case .next(let element):
            observer(.success(element))
        case .error(let error):
            observer(.error(error))
        case .completed:
            observer(.completed)
        }
    }
}
Copy the code

There is a stopped variable that controls Maybe events to respond to only one.

We can also use the asMaybe() method:

func asMaybeTest() { Observable<String>.of("A") .asMaybe() .subscribe(onSuccess: {(element) in print (" = = \ (element) = = ")}, onError: {(error) in print (" error = = = = ")}) {print (" complete = = = = ")}. The disposed (by: disposeBag) }Copy the code

Console output:

==A==
Copy the code

The source code is as follows:

public func asMaybe() -> Maybe<Element> { return PrimitiveSequence(raw: AsMaybe(source: self.asObservable())) } final class AsMaybe<Element>: Producer<Element> { fileprivate let _source: Observable<Element> init(source: Observable<Element>) { self._source = source } override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = AsMaybeSink(observer: observer, cancel: cancel) let subscription = self._source.subscribe(sink) return (sink: sink, subscription: subscription) } } fileprivate final class AsMaybeSink<Observer: ObserverType> : Sink<Observer>, ObserverType { typealias Element = Observer.Element private var _element: Event<Element>? func on(_ event: Event<Element>) { switch event { case .next: if self._element ! = nil {/ / multiple elements complains. Self forwardOn (. Error (RxError. MoreThanOneElement)) the self. The dispose ()} self. _element = event case .error: self.forwardOn(event) self.dispose() case .completed: if let element = self._element { self.forwardOn(element) } self.forwardOn(.completed) self.dispose() } } }Copy the code

You can see that asMaybe() is similar to asSingle(). AsMaybe just adds more processing to complete events.

Maybe can be seen as a combination of Single and Completable. Note that the Maybe sequence will also throw an exception if there are more than one element.

conclusion

  • Drive: Subscription operations are on the main thread and share resource operationsUIThe binding operation of.
  • Single: Either emits a single element and completes, or an error event.
  • Completable: a sequence without elements, either a completion event or an error event.
  • Maybe: either emits a single element and completes, an error event, or a completion event.