preface

In the last article we looked at the filter class operator. In this article we will look at the RxJava composite class operator. Combined operators are mainly used to process multiple Observables at the same time, combining them to create new Observables that meet our needs. Let’s take a look at what they have.

Combinatorial operator

Merge

The merge operator merges observation sequences emitted by two Observables into a single sequence. The sequence is sorted according to the emission time of each element of the two sequences, while the elements emitted at the same time point are disordered.

// Merge an Observable that sends letters with an Observable that sends numbers
final String[] words = new String[]{"A"."B"."C"."D"."E"."F"."G"."H"."I"};
// The letter Observable emits every 200ms
Observable<String> wordSequence = Observable.interval(200, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return words[position.intValue()];
            }
        })
        .take(words.length);
// Digital Observable emits every 500ms
Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(4);
Observable.merge(wordSequence, numberSequence)
        .subscribe(new Action1<Serializable>() {
            @Override
            public void call(Serializable serializable) {
                Log.e("rx_test"."The merge."+ serializable.toString()); }});Copy the code

Output result:

A merge: B merge:0Merge: C merge: D merge: E merge1Merge: F merge: G merge:2Merge: H merge: I merge:3Copy the code

Schematic diagram:

The merge operator also has an input parameter, merge(Observable[]), which passes in a collection of multiple Observables. The merge operator also merges the sequences of these observables and emits them.

MergeDelayError

The mergeDelayError operator, similar to the merge function, merges observables. The difference is that the mergeDelayError operator does not stop the merge immediately if an exception occurs during the merge, but emits an exception after all elements have been merged. But the Observable from which the exception occurred does not emit data.

Observable, emitted every 200ms, generates an exception during simulation
Observable<String> wordSequence = Observable.interval(200, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                Long cache = position;
                if (cache == 3) {
                    cache = cache / 0;
                }
                return words[position.intValue()];
            }
        })
        .take(words.length);
// Digital Observable emits every 500ms
Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(4);
Observable.mergeDelayError(wordSequence, numberSequence)
        .subscribe(new Action1<Serializable>() {
            @Override
            public void call(Serializable serializable) {
                Log.e("rx_test"."mergeDelayError:"+ serializable.toString()); }},new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.e("rx_test"."mergeDelayError:"+ throwable.getMessage()); }},new Action0() {
            @Override
            public void call(a) {
                Log.e("rx_test"."MergeDelayError: the onComplete"); }});Copy the code

Output result:

MergeDelayError: A mergeDelayError: B mergeDelayError0MergeDelayError: C mergeDelayError1MergeDelayError:2MergeDelayError:3MergeDelayError: Divide by ZeroCopy the code

As can be seen from the output, wordSequence throws an exception when it is transmitted to C and stops transmitting its remaining data, but the merge does not stop. The anomaly was launched after the merge was complete. Schematic diagram:

Concat

The concat operator merges data emitted by multiple ObserBavles, similar to the merge operator. But the concat operator emits observables in order.

Observable<String> wordSequence = Observable.just("A"."B"."C"."D"."E");
Observable<Integer> numberSequence = Observable.just(1.2.3.4.5);
Observable<String> nameSequence = Observable.just("Sherlock"."Holmes"."Xu"."Lei");
Observable.concat(wordSequence, numberSequence, nameSequence)
        .subscribe(new Action1<Serializable>() {
            @Override
            public void call(Serializable serializable) {
                Log.e("rx_test"."concat:"+ serializable.toString()); }});Copy the code

Output result:

A concat: B concat: C concat: D concat: E concat:1Concat:2Concat:3Concat:4Concat:5Concat: Sherlo concat: Holmes concat: Xu concat: LeiCopy the code

Schematic diagram:

Zip

The zip(Observable, Observable, Func2) operator merges two Observable data items according to the call() method rules in Func2 and fires. Note: If one Of the Observables finishes sending data or has an exception, the other One stops sending data.

Observable<String> wordSequence = Observable.just("A"."B"."C"."D"."E");
Observable<Integer> numberSequence = Observable.just(1.2.3.4.5.6);
Observable.zip(wordSequence, numberSequence, new Func2<String, Integer, String>() {
    @Override
    public String call(String s, Integer integer) {
        return s + integer;
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e("rx_test".Zip: ""+ s); }});Copy the code

Output result:

Zip: A1 zip: B2 zip: C3 zip: D4 zip: E5Copy the code

The output shows that the last 6 of the numberSequence observation sequence is not sent. The combination sequence has stopped sending data because the wordSequence observation sequence has sent all data. Schematic diagram:

StartWith

The startWith operator is used to insert and emit the specified data before the source Observable emits it.

Observable.just(4.5.6.7)
        .startWith(1.2.3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e("rx_test"."startWith:"+ integer); }});Copy the code

Output result:

StartWith:1StartWith:2StartWith:3StartWith:4StartWith:5StartWith:6StartWith:7Copy the code

Schematic diagram:

StartWith has two other inputs:

  • StartWith (Iterable) : Inserts the Iterable data before the data emitted by the source Observable.
  • StartWith (Observable) : Inserts data emitted by another Observable before data emitted by the source Observable.

    SwitchOnNext

    The switchOnNext operator is used to convert a source Observable that emits multiple observables into one Observable, and then emits data transmitted by multiple observables. If the source Observable sends a new small Observable when the small Observable is transmitting data, the data not yet transmitted by the previous small Observable will be discarded and the data transmitted by the new small Observable will be directly transmitted, as shown in the example above.

    “`java

    // Generates an Observable every 500ms

    Observable> observable = Observable.interval(500, TimeUnit.MILLISECONDS)

    .map(new Func1<Long, Observable<Long>>() {@override public Observable<Long> call(Long aLong) {// Generate data every 200 millisecondsreturn Observable.interval(200, TimeUnit.MILLISECONDS)
                      .map(new Func1<Long, Long>() {
                          @Override
                          public Long call(Long aLong) {
                              return aLong * 10;
                          }
                      }).take(5);
              }
      }).take(2);Copy the code

Observable.switchOnNext(observable) .subscribe(new Action1() { @Override public void call(Long aLong) { Log.e(“rx_test”, SwitchOnNext: + aLong); }});

Java switchOnNext: 0 switchOnNext: 10 switchOnNext: 0 switchOnNext: 10 switchOnNext: 20 switchOnNext: 30 switchOnNext: 40Copy the code

The output results show that the first small Observable stops transmitting data when it prints to 10, indicating that when it is transmitted to 10, a new small Observable is created, and the first small Observable is interrupted to transmit data of the new small Observable. Schematic diagram:

CombineLatest

The combineLatest operator, used to combine and emit the data recently emitted by two Observale as the Func2 function.

// reference the merge example
final String[] words = new String[]{"A"."B"."C"."D"."E"."F"."G"."H"."I"};
Observable<String> wordSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return words[position.intValue()];
            }
        })
        .take(words.length);
Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(5);
Observable.combineLatest(wordSequence, numberSequence,
        new Func2<String, Long, String>() {
            @Override
            public String call(String s, Long aLong) {
                return s + aLong;
            }
        })
        .subscribe(new Action1<Serializable>() {
            @Override
            public void call(Serializable serializable) {
                Log.e("rx_test"."combineLatest:"+ serializable.toString()); }});Copy the code

Output result:

A0 combineLatest: B0 combineLatest: C0 combineLatest: C1 combineLatest: D1 combineLatest: E1 combineLatest: E2 combineLatest: F2 combineLatest: F3 combineLatest: G3 combineLatest: H3 combineLatest: H4 combineLatest: I4Copy the code

If the input order of wordSequence and numberSequence were interchanged, the output would also be different:

CombineLatest:0A combineLatest:0B combineLatest:0C combineLatest:1C combineLatest:1D combineLatest:2D combineLatest:2E combineLatest:2FCombineLatest:3FCombineLatest:3G combineLatest:3H combineLatest:4H combineLatest:4ICopy the code

WordSequence fires a character every 300ms, and numberSequence fires a number every 500ms. For those of you who don’t know where the output comes from, the operator is a bit of a puzzle. Let’s take a look at the schematic diagram to make it clear. Schematic diagram:

Join

The Join (Observable, Func1, Func1, Func2) operator, similar to the combineLatest operator, is used to arrange and combine ObservableA and ObservableB emitted data. However, the Join operator can control the life cycle of each data emitted by an Observable. In the life cycle of each transmitted data, it can be merged with the data emitted by another Observable according to certain rules. Let’s take a look at several input parameters of the join.

  • Observable: The target Observable that needs to be combined with the source Observable.
  • Func1: receives data emitted from the source Observable and returns an Observable whose declared period determines the validity period of the emitted data from the source Obsrvable;
  • Func1: receives the data emitted by the target Observable and returns an Observable. The declared period of this Observable determines the validity period of the data emitted by the target Obsrvable.
  • Func2: Receives the data emitted from the source Observable and the target Observable and returns the two data according to self-defined rules.
    // Generate a sequence of letters with a period of 1000ms
    String[] words = new String[]{"A"."B"."C"."D"."E"."F"."G"."H"};
    Observable<String> observableA = Observable.interval(1000, TimeUnit.MILLISECONDS)
          .map(new Func1<Long, String>() {
              @Override
              public String call(Long aLong) {
                  return words[aLong.intValue()];
              }
          }).take(8);
    // produce the sequence of 0,1,2,3,4,5,6,7 with a delay of 500ms and a period of 1000ms
    Observable<Long> observableB = Observable.interval(500.1000, TimeUnit.MILLISECONDS)
          .map(new Func1<Long, Long>() {
              @Override
              public Long call(Long aLong) {
                  return aLong;
              }
          }).take(words.length);
    //join
    observableA.join(observableB,
          new Func1<String, Observable<Long>>() {
              @Override
              public Observable<Long> call(String s) {
                  //ObservableA transmits data valid for 600ms
                  return Observable.timer(600, TimeUnit.MILLISECONDS); }},new Func1<Long, Observable<Long>>() {
              @Override
              public Observable<Long> call(Long aLong) {
                  //ObservableB transmits data valid for 600ms
                  return Observable.timer(600, TimeUnit.MILLISECONDS); }},new Func2<String, Long, String>() {
              @Override
              public String call(String s, Long aLong) {
                  return s + aLong;
              }
          }
    ).subscribe(new Action1<String>() {
      @Override
      public void call(String s) {
          Log.e("rx_test"."join:"+ s); }});Copy the code

    The combination mode of the join operator is similar to the arrangement and combination rule in mathematics. ObservableA is the reference source Observable, which emits data according to its own cycle, and each transmitted data has its validity period. ObservableB combines each data emitted by ObservableB with the data emitted by A and still within the validity period according to the rules of Func2, and the data emitted by B also has its validity period. The results are then beamed to observers for processing.

    Output result:

    Join: A0 join: A1 join: B1 join: B2 join: C2 join: C3 join: D3 join: D4 join: E5 join: F5 join: F6 join: G6 join: G7 join: H7Copy the code

    Schematic diagram:

GroupJoin

The groupJoin operator is similar to the Join operator, but the difference is that the function of the fourth parameter Func2 is different. It wraps a layer of small Observables after the join, so that users can perform filtering and transformation operations again and send them to Observables.

observableA.groupJoin(observableB,
        new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(600, TimeUnit.MILLISECONDS); }},new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.timer(600, TimeUnit.MILLISECONDS); }},new Func2<String, Observable<Long>, Observable<String>>() {
            @Override
            public Observable<String> call(final String s, Observable<Long> longObservable) {
                return longObservable.map(new Func1<Long, String>() {
                    @Override
                    public String call(Long aLong) {
                        returns + aLong; }}); } }) .subscribe(new Action1<Observable<String>>() {
            @Override
            public void call(Observable<String> stringObservable) {
                stringObservable.subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("rx_test"."groupJoin:"+ s); }}); }});Copy the code

Output result:

GroupJoin: A0 groupJoin: A1 groupJoin: B1 groupJoin: B2 groupJoin: C2 groupJoin: C3 groupJoin: D3 groupJoin: D4 groupJoin: E4 groupJoin: E5 groupJoin: F5 groupJoin: F6 groupJoin: G6 groupJoin: G7 groupJoin: H7Copy the code

Schematic diagram:

conclusion

That concludes this article on RxJava’s common composite class operators. Through the above four articles on RxJava four types of operator learning, I believe that you have a basic grasp of how to use RxJava. Practice is the only test of truth, and in the next article we will take a look at how RxJava is used in practice. If you have any doubts or suggestions, you can also put forward them in the project Issues of RxJavaDemo on Github. I will reply in time. Attached is the address of RxJavaDemo: RxJavaDemo