How to use RxJava

  1. An Observables is created to send data
  2. Convert these Observables into the data format you want using the Observable operation
  3. The observer then makes a concrete response to the data, responsive programming
  4. Error handling

2. Create an Observables

  1. Data streams can be created using just, range, create, etc. Some of the most important apis are described below
  2. For reference: github.com/ReactiveX/R…

1, just

  1. Create an existing data and send it downstream for consumption
  2. The value ranges from 1 to 9
Observable.just("world")
        .doOnNext(v -> System.out.println(v))
        .subscribe(v -> System.out.println("hello " + v));
Copy the code

2, the from

  1. Creates a list of existing data sources, unlike Just, which contains multiple data sources
  2. From includes fromArray, fromIterator, and fromCallable
Observable.fromIterable(Lists.newArrayList(1, 2, 3)) .subscribe(v -> System.out.println("hello " + v)); Observable.fromCallable(() -> "world").subscribe(System.out::println); Completable.fromrunnable (() -> system.out.println ("111")).subscribe(() -> system.out.println ("done"), error -> error.printStackTrace());Copy the code

3, create

  1. When a user defines a function to create a running data stream, such as receiving a client request, the data source is an endless stream
  2. Data flow downstream through onNext in Emitter, onComplete indicates that data has been sent, and onError indicates that there is an anomaly in sending data
  3. This back pressure thing
ObservableOnSubscribe handler = emitter -> {// onNext sends data to the emitter. OnNext ("create"); emitter.onNext("a source"); // Emitter. OnComplete (); // Emitters. OnError (new RuntimeException("error")); }; // Subscribe the first action is to handle the upstream stream, the second action is to handle the exception, Observable.create(handler).subscribe(system.out ::println, error -> System.out.println("error:" + error), () -> System.out.println("done!" ));Copy the code

4, defer

  1. Create a Callable to generate the data stream Observable that is actually called when the user subscribes
  2. The result printed in the following example is different, with a difference of one second
Observable<Long> observable = Observable.defer(() -> {
    long time = System.currentTimeMillis();
    return Observable.just(time);
});

observable.subscribe(time -> System.out.println(time));

Thread.sleep(1000);

observable.subscribe(time -> System.out.println(time));
Copy the code

5, the range

  1. Create a series of values, range creates an int, rangeLong creates a long
String greeting = "Hello World!" ; Observable<Integer> indexes = Observable.range(0, greeting.length()); Observable<Character> characters = indexes .map(index -> greeting.charAt(index)); characters.subscribe(character -> System.out.print(character), error -> error.printStackTrace(), () -> System.out.println());Copy the code

6, the interval

  1. Similar to a scheduled task, it is executed at regular intervals, returning longs starting at 0 and +1 for each execution
Observable.interval(1, TimeUnit.SECONDS).subscribe(time -> { System.out.println(System.currentTimeMillis()); if (time % 2 == 0) { System.out.println("Tick"); } else { System.out.println("Tock"); }}); Thread.sleep(2000);Copy the code

7, the timer

  1. This is similar to interval, but executed once
Observable<Long> eggTimer = Observable.timer(5, TimeUnit.MINUTES); eggTimer.blockingSubscribe(v -> System.out.println("Egg is ready!" ));Copy the code

Three, operation conversion Observables

  1. Operations are similar to those of SKIP, take, Merge, map, and flatMap
  2. Reference: github.com/ReactiveX/R…
Observable.range(1, 50).skip(10).subscribe(system.out ::println);Copy the code

Fourth, error handling

  1. This is the logic for handling error, as in the example above
  2. Specific error handling API reference:
ObservableOnSubscribe handler = emitter -> { emitter.onNext("create"); emitter.onNext("a source"); emitter.onComplete(); OnError emitter. OnError (new RuntimeException("error")); }; Observable.create(handler).subscribe(system.out ::println,).subscribe(system.out ::println,) error -> System.out.println("error:" + error), () -> System.out.println("done!" ));Copy the code