Observable sequence Observable

Through the previous article on the source analysis of RxSwift, we know that in RxSwift one of the main idea is that everything is sequence, the sequence here is our observable sequence, can also be called the observer. So when you use RxSwift you always have to deal with an Observer Observer. Here’s a refresher on the definition of an Observer.

This article mainly explains the Observer to create, subscribe, destroy the usage, does not go into the depth of its implementation, does not explain its source code, if you want to understand the source code implementation process analysis please refer to my previous article: sequence core logic Swift books download: download address

Observables definition

An Observable listens for events and responds to them. Or anything that responds to an event is an observer. Such as:

  • When we click the button, a prompt box pops up. The Observer is an Observer
  • When we request a remote JSON data, we print it out. So this “print JSON data” is the Observer

In RxSwift, Observable is commonly called “Observable sequence”, which is mainly used to form a data stream or event stream. All events generated by operations are transmitted by Observable.

There are three types of events in an Observable (Event: Event enumerated, three members, Next, error, completed). All events sent by an Observable are one Event:

  • The Next event is an event emitted when new data appears in an Observable and carries a new data object.

  • A completed event is a completed Observable that generates no more data when no new data is present.

  • An error event is an event emitted when a data stream encounters an error that causes an Observable to end.

Observable creation, subscription, and destruction

Observables create

Create an Observable in the SUBSCRIBE method

  1. The most direct way to create an observer is in theObservable 的 subscribeMethod describes how you need to respond when an event occurs.
  2. For example, in the following example, the observer is from behindonNext.onError.onCompletedThese closures are constructed.
  3. Example 1:

Code:

let observable = Observable.of("A", "B", "C")
          
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
})
Copy the code

Running results:

A
B
C
completed
Copy the code

Create an Observable in the Bind method

  1. Example 2: In the following code, we create an Observable that periodically generates index numbers and displays them on the label

Code:

import UIKit import RxSwift import RxCocoa class ViewController: UIViewController { @IBOutlet weak var label: UILabel! Func viewDidLoad() {// Let Observable = disposeBag () override func viewDidLoad() {// Send an index every 1 second Observable<Int>.interval(1, scheduler: Mainscheduler. instance) observable. map {" Current index: Prompt ($0)"}.bind {[weak self](text) in self?.label.text = text}. Disposed (by: disposeBag)}}Copy the code

Create an Observable using AnyObserver

AnyObserver can be used to describe any kind of observer.

  1. AnyObserverCooperate withsubscribeMethods using

    Such as the aboveExample 1We can change to the following code:
// Let observer: AnyObserver<String> = AnyObserver {(event) in switch event {case.next (let data): print(data) case .error(let error): print(error) case .completed: print("completed") } } let observable = Observable.of("A", "B", "C") observable.subscribe(observer)Copy the code
  1. AnyObserverCooperate withbindToMethods using

    It can also be used in conjunction with bindTo, an Observable’s data binding method. Like the one up hereExample 2This can be changed to the following code:
import UIKit import RxSwift import RxCocoa class ViewController: UIViewController { @IBOutlet weak var label: UILabel! Func viewDidLoad() {// let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in switch event { case .next(let text): // Display the index on the label self? .label.text = text default: Scheduler (1, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler, scheduler) Mainscheduler.instance) Observable. map {" Current index: \($0)"}. Bind (to: observer). Disposed (by: disposeBag)}}Copy the code

Create Observables with binders

  1. Binder is more focused on specific scenarios than AnyObserver is. Binder has the following two characteristics:
  • Error events are not handled
  • Ensure that bindings are executed on the given Scheduler (default MainScheduler)

Once an error event occurs, fatalError is executed in the debug environment and error messages are printed in the release environment.

The text display for the label tag in Example 2 above is a typical UI observer. In response to events, it only handles the next event, and UI updates need to be performed on the main thread. So the better solution in that case is to use Binder and it’s much easier to switch to Binder.

The code modified with Binder is as follows:

import UIKit import RxSwift import RxCocoa class ViewController: UIViewController { @IBOutlet weak var label: UILabel! Func viewDidLoad() {// let observer: Binder<String> = Binder(label) { (view, View. text = text} //Observable (emits an index every 1 second) let Observable = Observable<Int>. Interval (1, scheduler: mainScheduler.instance) Observable. map {" Current index: \($0)"}. observer) .disposed(by: disposeBag) } }Copy the code

Create an Observable using the factory method

The Observable extension provides a series of factory methods for creating observed objects. There are mainly the following:

  • Never () : Creates a never sequence that does not emit any events and does not terminate.
  • Empty (): Creates an empty sequence that emits only a completed event.
  • Error (): Creates an error sequence that terminates with an ‘error’ event. That is, create an Observerable sequence that does not send any entries and immediately terminates the error
  • Just (): Creates a just sequence that contains only one element.
  • Of (): Creates a new observed sequence object that contains a variable number of elements.
  • From (): Creates an observed sequence from an array. Create an Observer from a sequence such as Array/Dictionary/Set
  • Range (): Generates a sequence of observed integers within the specified range, emitting events n times. Create an Observable sequence that emits a sequence of integers and then terminates.
  • RepeatElement (): Generates a sequence to be observed and emits the specified element n times.
  • Generate (): Creates a sequence to be observed and emits status values as long as the supplied condition is true.
  • Deferred (): Creates a new observed sequence for each observer that subscribes to the event. (One-on-one relationship)
  • DoOn (): Executes the methods following do corresponding to the subscribed event before the subscribed observed event executes.
  • Create (): Custom defines an observed sequence with the specified method implementation.
  • Timer (): Gets the timer Observable sequence
  • Interval (): The bottom line is to encapsulate the timer

For the factory method above, the following is an example

Never create a sequence

The never() method creates an Observable sequence that never emits an Event (and never terminates). Example 4: Code:

let disposeBag = DisposeBag()
let neverSequence = Observable<String>.never()
_ = neverSequence.subscribe {_ in
    print("This will never be printed")
}.addDisposableTo(disposeBag)
Copy the code

The results of

Nothing is output.Copy the code
Empty creates a sequence

The empty() method creates an Observable sequence of empty content.

  • The source code to define
 public static func empty() -> Observable<E> {
        return EmptyProducer<E>()
    }
    
final private class EmptyProducer<Element>: Producer<Element> {
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.completed)
        return Disposables.create()
    }
}
Copy the code

The empty() method returns an EmptyProducer class that implements a subscribe() method inside the class and has only one. Completed state. Therefore, the empty() method is an empty method with no onNext event handler. Only onComplete events are handled.

  • The sequence diagram is as follows:

Example 5: Code:

let disposeBag = DisposeBag()
    Observable<Int>.empty().subscribe {
        event in
        print(event)
    }.addDisposableTo(disposeBag)
Copy the code

The results of

completed
Copy the code
Error create sequence

The error() method creates an Observable that does nothing but sends an error. Example 6: Code:

let disposeBag = DisposeBag()
    Observable<Int>.error(TestError.test)
        .subscribe { print($0) }
        .addDisposableTo(disposeBag)
Copy the code

Results:

error(test)
Copy the code
Just create sequence

The just() method is initialized by passing in a default value.

  • The source code to define
 public static func just(_ element: E) -> Observable<E> {
        return Just(element: element)
    }
Copy the code

The just() method is created as a single signal sequence and can only handle a single event. Simply put, when we use the Just () method, we cannot process a group of data together, only one data at a time. Just () creates a sequence from one of the arguments passed in, which sends two events to the subscriber, the first a.next event with element data, and the second.Completed.

  • Sequence diagram:

Example 7: Code:

let disposeBag = DisposeBag()
Observable.just("1").subscribe { event in
            print(event)
        }.addDisposableTo(disposeBag)
Copy the code

The results of

next(1)
completed
Copy the code
Of create sequence

The of() method can accept a variable number of arguments (which must be of the same type).

let disposeBag = DisposeBag()
Observable.of("1", "2", "3", "4").subscribe(onNext: { element in
     print(element)
}).addDisposableTo(disposeBag)
Copy the code

The results of

1
2
3
4
Copy the code
From create sequence

The from() method takes an array argument.

Example 9: Code:

let disposeBag = DisposeBag()
Observable.from(["1", "2", "3", "4"]).subscribe(onNext: { print($0) }).addDisposableTo(disposeBag)
Copy the code

The results of

1
2
3
4
Copy the code
Range creates sequence

The range() method creates an Observable sequence that starts with all the values in the range by specifying the start and end values. Example 10: Code:

let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10).subscribe { print($0) }.addDisposableTo(disposeBag)
Copy the code

The results of

next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
next(7)
next(8)
next(9)
next(10)
completed
Copy the code
RepeatElement creates the sequence

The repeatElement() method creates an Observable sequence that emits events for the given element indefinitely (never terminating). Example 11: Code:

let disposeBag = DisposeBag()
Observable.repeatElement("1")
.take(3)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
Copy the code

The results of

1
1
1
Copy the code
Generate create sequence

The generate() method creates a sequence of Observables that gives actions only if all of the criteria provided are true. Example 12: Code:

let disposeBag = DisposeBag()
Observable.generate(
           initialState: 0,
           condition: { $0 < 3 },
           iterate: { $0 + 1 }
        )
        .subscribe(onNext: { print($0) })
        .addDisposableTo(disposeBag)
Copy the code

The results of

0
1
2
Copy the code
Deferred create sequence

Example 13: Code:

let disposeBag = DisposeBag() var count = 1 let deferredSequence = Observable<String>.deferred { print("Creating \(count)") count += 1 return Observable.create { observer in print("Emitting..." ) observer.onNext("1") observer.onNext("2") observer.onNext("3") return Disposables.create() } } deferredSequence .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) deferredSequence .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag)Copy the code

The results of

Creating 1
Emitting...
1
2
3
Creating 2
Emitting...
1
2
3
Copy the code
Deferred create sequence

The Deferred () method creates an Observable factory that performs deferred Observable sequence creation by passing in a block where the actual sequence object is instantiated.

Example 14: Code:

let disposeBag = DisposeBag() var count = 1 let deferredSequence = Observable<String>.deferred { print("Creating \(count)") count += 1 return Observable.create { observer in print("Emitting..." ) observer.onNext("1") observer.onNext("2") observer.onNext("3") return Disposables.create() } } deferredSequence .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag) deferredSequence .subscribe(onNext: { print($0) }) .addDisposableTo(disposeBag)Copy the code

The results of

Creating 1
Emitting...
1
2
3
Creating 2
Emitting...
1
2
3
Copy the code
DoOn creates the sequence

Example 15: Code:

let disposeBag = DisposeBag() Observable.of("1", "2", "3", "4") .do(onNext: { print("Intercepted:", $0) }, onError { print("Intercepted error:", $0) }, onCompleted: {print("Completed")}).subscribe(onNext {print($0)}, onCompleted: {print(" end ")}).addDisposableto (disposeBag)Copy the code

The results of

Intercepted: 1 1 Intercepted: 2 2 Intercepted: 3 3 Intercepted: 4 4 Completed EndCopy the code
Create create sequence

The create() method takes a block and processes each incoming subscription.

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
Copy the code

The argument to the create() method is a function (closure) that creates a sequence from the closure, where you can customize events. Disposable is a protocol interface, and there is only one dispose method in it to dispose of some resources. The entire create() method returns an AnonymousObservable(AnonymousObservable).

The sequence diagram is as follows:

Examples of 16:

let disposeBag = DisposeBag()
let myJust = { (element: String) -> Observable<String> in
    return Observable.create { observer in  
    observer.on(.next(element))
    observer.on(.completed)
    return Disposables.create()
    }
}
myJust("1").subscribe { print($0) }.addDisposableTo(disposeBag)
Copy the code

The results of

next(1)
completed
Copy the code
Timer creates sequence

The timer() method can be used in two ways. One is to create an Observable sequence that generates a unique element after a set period of time. The other is to create an Observable sequence that generates an element every once in a while after a set period of time.

Examples of 17:

Way: / / 5 seconds after making only one element 0 let observables = observables < Int >. The timer (5, the scheduler: Mainscheduler.instance) observable.subscribe {event in print(event)} Timer (5, period: 1, scheduler: 1, scheduler: 1, scheduler: 1); MainScheduler.instance) observable.subscribe { event in print(event) }Copy the code
Interval creates a sequence

The Interval () method creates an Observable that emits an index element at specified intervals. And it’s going to keep going.

Examples of 18:

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}
Copy the code

The Observer to subscribe to

With an Observable, we also subscribe to it using the subscribe() method and receive its events.

There are two ways to subscribe to an Observable:

  1. We subscribe to an Observable using subscribe(). The block callback is an event that is emitted. To retrieve data from the event, use event.Element. Such as:
let observable = Observable.of("A", "B", "C")
observable.subscribe { event in
    print(event)
    //print(event.element)
}
Copy the code

The results of

\ \ next next (A) (B) next © \ completedCopy the code
  1. RxSwift offers anothersubscribeMethod, which can classify events and handle different types of events through different block callbacks. At the same time, the data carried by the event will be directly unpacked as parameters, which is convenient for us to use.subscribe()methodsonNext,onError,onCompletedThe four callback block arguments, and onDisposed, have default values, that is, they are optional. So we could just do onNext and leave the rest alone.

Example 19:

let observable = Observable.of("A", "B", "C")
         
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
}, onDisposed: {
    print("disposed")
})
Copy the code

Observables destroyed

An Observable isn’t activated immediately after it’s created to emit events, it doesn’t activate until someone subscribes to it. An Observable is activated until it issues an.error or.completed event. Dispose () is used to destroy an Observable.

  • Dispose () (1) Using this method we can manually cancel a subscription behavior. (2) If we think the subscription is no longer needed, we can call dispose() method to dispose the subscription to prevent memory leakage. (3) When a subscription is disposed, an Observable can’t receive the message if it sends an event again.

Example 20 code:

let observable = Observable.of("A", "B", Subscribe {event in print(event)} // Call the dispose() method of the subscription subscription.dispose()Copy the code
  • In addition to the dispose() method, we more often use an object called DisposeBag to manage the destruction of multiple subscriptions:
  1. We can think of a DisposeBag object as a garbage bag into which we can put all of our used subscriptions.
  2. The DisposeBag then calls its Dispose () method on all subscriptions as it approaches dealloc.

Example 21: Code:

Let disposeBag = disposeBag () Observable1 = observable. of("A", "B", "C") observable1. Subscribe {event in print(event)}. DisposeBag) // the second Observable, Observable2 = Observable.of(1, 2, 3) observable2.subscribe {event in print(event)}. Disposed (by: disposeBag)Copy the code

Since the | address

Swift Books download:Download address