• Concept:

    • Creates an observed object & produces an event

      • Other ways to create an Observable:

        • Method 1: Just (T…) : Directly sends the parameters passed in
        • From (T[])/from(Iterable)
    • Creating an observer object defines the behavior of responding to an event

      • Method 1: Use the Observer interface
      • Option 2: Use the Subscriber abstract class to create an Observer object
      • The difference between mode 1 and mode 2
    • The observer is connected to the observed by subscribing

    • RxJava chain operations

    • Take Consumer as an example: Implement the simple observer mode

Concept:

RxJava: A Library for composing Asynchronous and Event-based programs using Observable sequences for the Java VM RxJava is a library that uses observable sequences to compose asynchronous, event-based programs on the JVM

Creates an observed object & produces an event

Private void creatObservable() {observable = ObservableOnSubscribe<Integer>() {// Create () yes The basic RxJava method for creating event sequences passes in an OnSubscribe object argument // The OnSubscribe call() method is automatically called when an Observable is subscribed. That is, the sequence of events will be triggered according to the setting // that is, the observer will call the corresponding event copy method in order to respond to the event // so as to achieve the observer called the observer callback method & by the observer to the observer event pass, that is, observer mode // 2\. @override public void subscribe(ObservableEmitter<Integer> Emitter) throws Exception {// Pass ObservableEmitter class object generates events and notifies the observer. OnNext (1); Emitters. OnNext (1); Emitters. emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onComplete(); }}); }Copy the code

Other ways to create an Observable:

Method 1: Just (T…) : Directly sends the parameters passed in

observable1=Observable.just("A","B","C"); OnNext ("A"); onNext("B"); onNext("C"); onCompleted();Copy the code

From (T[])/from(Iterable)

From (T[])/from(Iterable<? Extends T>) : Extends String[] words = {"A", "B", "C"}; extends T>) : Extends String[] words = {"A", "B", "C"}; Observable<String> observable=Observable.fromArray(words); // call onNext("A"); onNext("B"); onNext("C"); onCompleted();Copy the code

Creating an observer object defines the behavior of responding to an event

Method 1: Use the Observer interface

private void creatObserver() { // 1\. Observer = new Observer<Integer>() {// 2\. Create objects that respond to corresponding events by copying corresponding events // Before the observer receives the event, Override public void subscribe (Disposable d) {Log. D (TAG, "start using Observer connection "); } @override public void onNext(Integer value) {log. d(TAG, "respond to Next event" + value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}; }Copy the code

Option 2: Use the Subscriber abstract class to create an Observer object

/** * Mode 2: Subscriber class = rxA built-in Subscriber class that implements the Observer, To the Observer interface extends * note that this method is RxJava1.0 version of above 2.0 abandoned * / private void creatObserverBySubscriber () {the subscriber = new Subscriber<Integer>() {// Respond to the corresponding event by copying the corresponding event method when creating the object. Override public void subscribe (Subscription s) {log.d (TAG, "subscribe "); } // When the observer produces the Next event & the observer receives, @override public void onNext(Integer Integer) {log. d(TAG, "subscribe "); Override public void onError(Throwable t) {log. d(TAG, "response to Error "); } // Override public void onComplete() {log.d (TAG, "respond to the Complete event "); }}; }Copy the code

The difference between mode 1 and mode 2

  1. The two are used in exactly the same way (essentially, in the subscribe process of RxJava, Observer will always be converted to Subscriber before use).
  2. OnStart () : called before an event has been responded to to do some initialization
  3. Unsubscribe () : Used to unsubscribe. After this method is called, the observer no longer receives & responds to events
  4. Before invoking the method, isUnsubscribed() is used to determine the status and determine whether the observed Observable still holds observer Subscriber references. If the references are not released in time, memory leaks may occur

The observer is connected to the observed by subscribing

observable.subscribe(observer); // observable.subscribe((Observer<? super Integer>) subscriber); Note that this is the method in RXJava version 1.0, // public Subscription subscribe(); // Public Subscription (); // Public Subscription () subscriber) { // subscriber.onStart(); // Override method of the observer subscriber abstract class in Step 1, which is used to initialize the job onsubscribe.call (subscriber); // By calling the corresponding method in the observer to respond to the event produced by the observer // To implement the observer called the observer callback method & by the observer to the observer event pass, namely the observer mode // Also see: An Observable only produces events, and actually sends events when it subscribes, when the subscribe() method is executed.Copy the code

RxJava chain operations

private void creatRxJava() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // 1\. Creating the Observed & Producing Events Emitters. OnNext (11); emitter.onNext(22); emitter.onNext(33); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { // 2\. By connecting the observer to the observed by subscribing // 3\. // 1\. Define Disposable private Disposable mDisposable; @override public void subscribe (Disposable d) {Log. D (" subscribe "); mDisposable = d; } @override public void onNext(Integer value) {log. d(TAG, "Next event" + value + "response "); Dispose () can be used to cut off the connection between the observer and the observed, that is, the observer cannot continue to receive the observed event, If (value == 22) {// Set the dispose() to disengage the observer after receiving the second event. Log. D (TAG, "Already disposed:" + mDisposable. IsDisposed ()); }} @override public void onError(Throwable e) {log. d(TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); }Copy the code

Take Consumer as an example: Implement the simple observer mode

/** * Take Consumer as an example: */ Private void creatRxJavaConsumer() {Observable. Just ("ConsumerHelloRxjava").subscribe(new) Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("ConsumerHelloRxjava"); }}); }Copy the code