How to use RxJava
- An Observables is created to send data
- Convert these Observables into the data format you want using the Observable operation
- The observer then makes a concrete response to the data, responsive programming
- Error handling
2. Create an Observables
- Data streams can be created using just, range, create, etc. Some of the most important apis are described below
- For reference: github.com/ReactiveX/R…
1, just
- Create an existing data and send it downstream for consumption
- The value ranges from 1 to 9
Observable.just("world")
.doOnNext(v -> System.out.println(v))
.subscribe(v -> System.out.println("hello " + v));
Copy the code
2, the from
- Creates a list of existing data sources, unlike Just, which contains multiple data sources
- From includes fromArray, fromIterator, and fromCallable
Observable.fromIterable(Lists.newArrayList(1, 2, 3)) .subscribe(v -> System.out.println("hello " + v)); Observable.fromCallable(() -> "world").subscribe(System.out::println); Completable.fromrunnable (() -> system.out.println ("111")).subscribe(() -> system.out.println ("done"), error -> error.printStackTrace());Copy the code
3, create
- When a user defines a function to create a running data stream, such as receiving a client request, the data source is an endless stream
- Data flow downstream through onNext in Emitter, onComplete indicates that data has been sent, and onError indicates that there is an anomaly in sending data
- This back pressure thing
ObservableOnSubscribe handler = emitter -> {// onNext sends data to the emitter. OnNext ("create"); emitter.onNext("a source"); // Emitter. OnComplete (); // Emitters. OnError (new RuntimeException("error")); }; // Subscribe the first action is to handle the upstream stream, the second action is to handle the exception, Observable.create(handler).subscribe(system.out ::println, error -> System.out.println("error:" + error), () -> System.out.println("done!" ));Copy the code
4, defer
- Create a Callable to generate the data stream Observable that is actually called when the user subscribes
- The result printed in the following example is different, with a difference of one second
Observable<Long> observable = Observable.defer(() -> {
long time = System.currentTimeMillis();
return Observable.just(time);
});
observable.subscribe(time -> System.out.println(time));
Thread.sleep(1000);
observable.subscribe(time -> System.out.println(time));
Copy the code
5, the range
- Create a series of values, range creates an int, rangeLong creates a long
String greeting = "Hello World!" ; Observable<Integer> indexes = Observable.range(0, greeting.length()); Observable<Character> characters = indexes .map(index -> greeting.charAt(index)); characters.subscribe(character -> System.out.print(character), error -> error.printStackTrace(), () -> System.out.println());Copy the code
6, the interval
- Similar to a scheduled task, it is executed at regular intervals, returning longs starting at 0 and +1 for each execution
Observable.interval(1, TimeUnit.SECONDS).subscribe(time -> { System.out.println(System.currentTimeMillis()); if (time % 2 == 0) { System.out.println("Tick"); } else { System.out.println("Tock"); }}); Thread.sleep(2000);Copy the code
7, the timer
- This is similar to interval, but executed once
Observable<Long> eggTimer = Observable.timer(5, TimeUnit.MINUTES); eggTimer.blockingSubscribe(v -> System.out.println("Egg is ready!" ));Copy the code
Three, operation conversion Observables
- Operations are similar to those of SKIP, take, Merge, map, and flatMap
- Reference: github.com/ReactiveX/R…
Observable.range(1, 50).skip(10).subscribe(system.out ::println);Copy the code
Fourth, error handling
- This is the logic for handling error, as in the example above
- Specific error handling API reference:
ObservableOnSubscribe handler = emitter -> { emitter.onNext("create"); emitter.onNext("a source"); emitter.onComplete(); OnError emitter. OnError (new RuntimeException("error")); }; Observable.create(handler).subscribe(system.out ::println,).subscribe(system.out ::println,) error -> System.out.println("error:" + error), () -> System.out.println("done!" ));Copy the code