RxJava is extensive and sophisticated, and if you want to get started and get advanced, operators are a starting point. So, we wanted to find a way to write the operators in a nice way, and at the same time quickly verify that the input and output were correct. There are two ideas:
- Using UT to implement each operator allows you to focus on the operator itself without relying on the Android platform.
- For each operator, it is implemented using RX Marbles, or the official MARBLE Diagrams of RxJava.
For example, in the following two images, from RX Marbles and the official Marbles, all we need to do is purposefully and precisely implement the inputs and outputs of both images using UT.
By purposeful and precise I/O, I/O means that the code is implemented strictly in accordance with the meaning expressed in the picture according to each data stream of pinball graph of all operators and the meaning of the operators. This obsessive-compulsive approach is very helpful in understanding operators and the architecture of RxJava.
(1) Preliminary knowledge
We want to focus on the implementation of the operators rather than the tricks of UT, but due to the asynchronous nature of RxJava, many of the operators are thread-specific, so we need to get a head start on how threads are handled in UT.
Let the test thread end last
In thread-specific test code, there is a tricky phenomenon: the test thread completes before the child thread, as follows:
@test public void test_thread_early() {system.out.println (" test_thread_early "); New Thread(new Runnable() {@override public void run() {system.out.println (" thread-start "); OperatorUtils.sleep(3000); System.out.println(" child thread-end "); } }).start(); System.out.println(" test thread -end"); system.out.println (" test thread -end"); }Copy the code
In the above code, the test thread completes execution instantaneously, while the child thread needs to execute 3s. The test thread completes execution earlier than the child thread, so the child thread cannot complete execution. Therefore, the output result is:
Test thread-start Test thread-end Child thread-startCopy the code
Accordingly, let’s look at the example of the RxJava operator, which uses the timer operator to delay 3s sending data:
@test public void test_thread_early_observable() {system.out.println (); " + Thread.currentThread().getName()); // The message source is executed in Schedulers.computation() thread and is executed after 3s, at which time the test thread has completed its execution. Observable.timer(3, timeunit.seconds). Subscribe (num -> {system.out. println("Observable and Subscriber thread: " + Thread.currentThread().getName()); System.out.println(" get subscription data: "+ num); }); System.out.println(" test thread --end"); }Copy the code
As in the above code, the test thread terminates early, and the Schedulers.computation() thread on which the timer operator is based will not complete, so the output is:
Test thread-start, thread: main Test thread-endCopy the code
If you can’t guarantee that all threads finish, you won’t get the expected output. So, how to solve this problem? One of the clumsiest ways to do this is to make the test Thread the last Thread to terminate. Adding logic like Thread.sleep(4000) to the test Thread ensures that the above two pieces of code will output normally. (I don’t want to cover too many testing tips in this article, but if you need more rigorous and powerful threading asynchronous testing, look at third-party frameworks such as Awaitility.)
Use TestScheduler to manipulate the time
In addition to this clumsy approach, RxJava provides TestScheduler, through which time can be manipulated.
For the timer operator mentioned above, through testScheduler. AdvanceTimeBy (3, TimeUnit. SECONDS) can advance the time of 3 s, where test thread and timer operator’s thread can be smoothly completed, the complete code is as follows:
@Test public void test_thread_with_TestScheduler() { TestScheduler testScheduler = Schedulers.test(); System.out.println(" Thread.currentThread().getName()); Observable.timer(3, timeunit.seconds, TestScheduler).subscribe(num -> {system.out.println ("Observable and Subscriber Thread: "+ thread.currentThread ().getName())); System.out.println(" get subscription data: "+ num); }); / / the time ahead of the 3 s testScheduler advanceTimeBy (3, TimeUnit. SECONDS); }Copy the code
Thread handling of aggregate operators
Many aggregation operators, such as merge and ZIP, require different data flows to be constructed in different threads to reflect the sequence of data flows sent and the corresponding output. How to complete the execution of multiple threads? This is done in combination with letting the test thread end at last and using TestScheduler. I’ll explain more in the section on aggregate operators below.
With this knowledge, you can basically implement all of the RxJava operators. Next, we will explain the different types of operators with one or two examples. For the complete code, please go to Github: github.com/geniusmart/…
(2) different types of operator implementation
interval
Interval, as a creation operator, has the ability to send data at intervals, which is the basis for writing other operators, so let’s start with interval.
The picture is very simple. The top down analysis is as follows:
- Operator: because
interval
The operator is created, so the operator is called directly to generate the data stream. According to the API parameters, we need to define the interval, which we set to 100ms. - Input: Executed
Observable.interval()
After that, output is generated at specified intervals0, 1, 2, 3...
(Note: from the pinball graph, you can see that the first data also has an interval). - Output: data consumers, as represented in RxJava
Subscriber
. The output data stream is not drawn in this diagram. To observe the output, we customize the subscribers. -
Implementation idea: The interval is executed by default in the Schedulers.computation() thread and will run longer than the test thread. As described in the “prep” section above, we use TestScheduler to manipulate the time. For example, in order to output 4 data, Interval takes four units of time (400ms), and advancing the time by 400ms produces the desired result. The concrete implementation is as follows:
@Test public void interval() { Observable.interval(100, TimeUnit.MILLISECONDS, mTestScheduler) .subscribe(mList::add); / / time in advance before the 400 ms mTestScheduler. AdvanceTimeBy (400, TimeUnit. MILLISECONDS); assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L)); / / time (400 + 200) before ms mTestScheduler. Early advanceTimeBy (200, TimeUnit. MILLISECONDS); assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L)); }Copy the code
By analogy, range, just and repeat creational operators can be realized in this way all the marbles, such operator implementation code please check: CreatingOperatorsTest. Java
delay
Delay is a tool-type operator that delays the sending of data streams.
Unlike the creation operator interval pinball, Delay has two streams of input and output data, with the operator’s conversion process in between. The input is implemented with the creation operator, such as just, and the output is done by the subscriber.
- Input: Use a simpler one
just
Operator, i.eObservable.just(1, 2, 1)
. - Output: through
delay
Output an input stream consistent with the input after a specified delay. -
The TestScheduler operator is used to manipulate time and verify whether there is a data flow output within or beyond the delay time. The code is as follows:
@Test public void delay() { Observable.just(1, 2, 1) .delay(3000, TimeUnit.SECONDS, mTestScheduler) .subscribe(mList::add); mTestScheduler.advanceTimeBy(2000, TimeUnit.SECONDS); System.out.println("after 2000ms,result = " + mList); assertTrue(mList.isEmpty()); mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS); System.out.println("after 3000ms,result = " + mList); assertEquals(mList, Arrays.asList(1, 2, 1)); }Copy the code
There are many other tool-based operators, such as observeOn and subscribeOn for thread transformation, doOnSubscribe, doOnNext and doOnCompleted for Observable life cycle event listening operators. Delaying delaySubscription of subscription, the operator implementation of this type, please check: UtilityOperatorsTest. Java.
amb
Amb is Conditional Operators. Data flows can be sent only when certain conditions are met, and the conditions amB needs to meet are: the data flow that generates data earliest among multiple data flows can be sent, which is clearly expressed in the pinball diagram.
- Input: There are three data streams that start sending data at different times, as explained by the previous operator, which is used here
just
+delay
Can be implemented. - Output: through
amb
After the change, the data stream that sent the earliest data is output, the second data stream. -
The delay operator is used to delay the data flow by 500s, 200s, and 1000s respectively, and the universal TestScheduler is used to advance the time by 1000s to subscribe to the data flow and verify the output. The code is as follows:
@Test public void amb() { Observable o1 = Observable.just(20, 40, 60) .delay(500, TimeUnit.SECONDS, mTestScheduler); Observable o2 = Observable.just(1, 2, 3) .delay(200, TimeUnit.SECONDS, mTestScheduler); Observable o3 = Observable.just(0, 0, 0) .delay(1000, TimeUnit.SECONDS, mTestScheduler); Observable.amb(o1, o2, o3) .subscribe(mList::add); mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS); assertEquals(mList, Arrays.asList(1, 2, 3)); }Copy the code
By analogy, more conditions for the operator, such as skipUntil, takeUntil, please see ConditionalAndBooleanOperatorsTest. Java.
buffer
A buffer is a conversion operator that can cache a single piece of data and send it in batches as a List.
Send 6 data, cache every 3, and send batch. The code implementation is as follows:
@Test
public void buffer() {
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3)
.subscribe(mList::add);
System.out.println(mList);
List> exceptList = Arrays.asList(Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6));
assertEquals(mList, exceptList);
}Copy the code
FlatMap and concatMap
Next, we compare a set of transformational operators flatMap and concatMap, both of which fully represent the valuable information provided by Marble Diagrams. Here are marble Diagrams for the two operators:
- Input: The two inputs are exactly the same, focusing on the color of the marbles, which represents the order of the data stream.
- Output: After the transformation of the input data flow, each data becomes two copies. In addition, after the flatMap transformation, the green ◇ and blue ◇ are crossed, while concatMap maintains the same order as the input. This detail determines how to implement the two graphs.
- Implementation idea: in
flatMap
或concatMap
After that, three pieces of data become six pieces of data, assuming the input is zeroOne, two, three
, then the output isOne, one, two, two, three, three
, we need to find a way to make the output time difference after transformation, namely according toOne, one, two, three, two, three
On second thought,interval
You can implement this scenario by taking the original input streams 1, 2, and 3 as respectivelyinterval
Time interval variable to simulate crossover output. The concrete implementation is as follows:
@Test public void flatMap() { Observable.just(1, 2, 3) .flatMap((Func1>) num -> Observable.interval(num - 1, Timeunit.seconds, mTestScheduler).take(2).map(value -> num + "◇")).subscribe(mList::add); mTestScheduler.advanceTimeBy(100, TimeUnit.SECONDS); AssertEquals (mList, Arrays. AsList (" 1 is left ", "1 is left", "2 left", "3 left", "2 left", "3 to cardiac")); System.out.println(mList); }Copy the code
In the above code, just change flatMap to concatMap, and the data flow of “1◇”, “1◇”, “2◇”, “2◇”, “3◇”, “3◇” can be obtained, which is exactly the same as the meaning expressed in the pinball diagram. Through this example, we can feel that the pinball graph contains many details of the operator, the precise implementation of pinball graph input and output, you can have a deeper understanding of the operator.
More type conversion operators, such as switchMap, groupBy, Windows, etc., please see TransformingOperatorsTest. Java.
debounce
Debounce is a filtered operator, so the data stream is filtered according to certain rules. The rule is that every Observable that generates a result submits the result to the subscriber if no other result is generated within a specified period of time. Otherwise, the result is ignored.
- Input: The input data stream can be defined as: generated first
1
Is generated after an interval of 500msTwo, three, four, five
, and then at an interval of 500ms6
, the use ofcreate
Operator combinationThread.sleep()
To implement the input. - Output:
debounce
The interval is set to 400ms, and output 1, 5, and 6 in three intervals. The specific code is as follows:
@Test public void debounce() { Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(1); OperatorUtils.sleep(500); subscriber.onNext(2); subscriber.onNext(3); subscriber.onNext(4); subscriber.onNext(5); OperatorUtils.sleep(500); subscriber.onNext(6); subscriber.onCompleted(); } }) .subscribeOn(mTestScheduler) .doOnNext(System.out::println) .debounce(400, TimeUnit.MILLISECONDS) .subscribe(mList::add); / / test thread will time early 10 ms, can guarantee the create operator mTestScheduler smoothly has been completed. The advanceTimeBy (10, TimeUnit. MILLISECONDS); System.out.println(mList); assertEquals(mList, Arrays.asList(1, 5, 6)); }Copy the code
By analogy, in this way can realize the sample, throttleFirst, throttleLast filtering operator, please see the specific code: FilteringOperatorsTest. Java.
merge
Merge is an aggregated operator. Since it is aggregation, more than two data streams are required, and after aggregation, a new data stream is output.
- Input: Two streams of data, with emphasis on the order in which the data is sent.
- Output: according to the order of the input data, intact after the merger, output.
- Implementation idea: both data streams are used
interval
Create, the interval of the first data flow is defined as 5s, and the second data flow sends the first data after the first data flow generates three data, so the interval is set to 18s, as follows:
@Test
public void merge() {
Observable observable1 = Observable.interval(5, TimeUnit.SECONDS, mTestScheduler)
.take(5)
.map(aLong -> (aLong + 1) * 20)
.doOnNext(System.out::println);
Observable observable2 = Observable.interval(18, TimeUnit.SECONDS, mTestScheduler)
.take(2)
.map(aLong -> 1L)
.doOnNext(System.out::println);
Observable.merge(observable1, observable2).subscribe(mList::add);
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList(20L, 40L, 60L, 1L, 80L, 100L, 1L));
}Copy the code
combineLatest
CombineLatest is an aggregative operator whose rule of aggregation is that each piece of data in each data stream is pairwise combined with the most recent data sent by another data stream.
Construct two data streams as shown in the pinball diagram, focusing on creating time difference and multithreading:
- use
create
+Thread.sleep()
To create time differences in data flow. -
Have two data streams send data in different threads, which can be scheduled using the subscribeOn operator.
-
The first data flow is constructed to generate data in the testScheduler.test () thread (which increases the ability to manipulate time) as follows:
Observable observable1 = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { OperatorUtils.logThread("observable1"); subscriber.onNext(1); OperatorUtils.sleep(500); subscriber.onNext(2); OperatorUtils.sleep(1500); subscriber.onNext(3); OperatorUtils.sleep(250); subscriber.onNext(4); OperatorUtils.sleep(500); subscriber.onNext(5); subscriber.onCompleted(); } }).subscribeOn(mTestScheduler).doOnNext(System.out::println);Copy the code
- The second data flow is defined as the thread that produces the data
Schedulers.newThread()
, the code is as follows:Observable observable2 = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { OperatorUtils.logThread("observable2"); OperatorUtils.sleep(250); subscriber.onNext("A"); OperatorUtils.sleep(300); subscriber.onNext("B"); OperatorUtils.sleep(500); subscriber.onNext("C"); OperatorUtils.sleep(100); subscriber.onNext("D"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.newThread()).doOnNext(System.out::println);Copy the code
- The first two points complete the input, and the next step is to perform aggregation transformation and consume the data to generate the output and verify that it is consistent with the output of the pinball graph.
(Func2) (integer, s) -> integer + s).subscribe(mList::add); / / test thread must be ahead of time, make observable1 mTestScheduler. Start sending data smoothly advanceTimeBy (10, TimeUnit. MILLISECONDS); System.out.println(mList); assertEquals(mList, Arrays.asList("1A", "2A", "2B", "2C", "2D", "3D", "4D", "5D"));Copy the code
Similar to Merge and combineLatest, we can implement the converged operators zip, switchOnNext, withLatestFrom, and so on, in turn, and see the differences between them. Convergent operators all code, please go to: CombiningOperatorsTest. Java.
connect
The previous creation operators all create a Cold Observable that starts sending data only when the subscriber subscribes. In contrast, a Hot Observable can just start sending data regardless of whether there are subscribers or not. Publish and connect are operators related to hot Observables.
This pinball map isn’t easy to understand, but if it’s fully implemented, it’s easy to see what a Hot Observable looks like. In this figure, the output has three data streams representing three subscribers, but the subscription times are inconsistent and the data received is inconsistent. In addition, the two operators publish and connect are reflected in this figure.
- Input: The data stream is generated cleanly, using the creation operator described above. Due to the need for time difference, therefore adopted
interval
To generate the data stream, and the interval is defined as 3s. In addition,interval
The resulting data stream is cold. How does it go from cold to hotpublish
Operator to do. - Output: The output information is quite large, we need to take a good look:
- The first step is to make it clear that there are three subscribers, each for a different period of time. Delayed subscriptions are available
delaySubscription
Operators. - The first subscriber subscribes immediately, without delay, before the data stream starts sending data, so he can subscribe to the full data stream.
- There is an operator in the first subscriber’s data stream that cannot be ignored
connect
“He decidedObservable
When to start sending data. As shown in the figure, the time is defined as 2 seconds later. - The second subscriber starts to subscribe after 2 data has been sent, so the subscription time is set to 6 seconds late to subscribe. He will only be able to subscribe to the last data.
- The third subscriber is not much different from the first, which we define as a 1-second delay in subscribing.
- The first step is to make it clear that there are three subscribers, each for a different period of time. Delayed subscriptions are available
The complete code implementation is as follows:
public void connect() { List list1 = new ArrayList<>(); List list2 = new ArrayList<>(); List list3 = new ArrayList<>(); // create data streams 1,2,3, ConnectableObservable = Observable.create(new Observable.OnSubscribe() {@override public void call(Subscriber subscriber) { subscriber.onNext(1); OperatorUtils.sleep(3000); subscriber.onNext(2); OperatorUtils.sleep(3000); subscriber.onNext(3); } }).publish(); System.out.println(" start subscribing data after 1-0s "); / / immediately. Subscribe to the complete data flow connectableObservable doOnNext (num - > System. Out. The println (" Subscriber1 -- -- > "+ num)). The subscribe (list1: : add); // Delay subscription for 6s, Will subscribe to only 3 of data flow connectableObservable. DelaySubscription (6, TimeUnit. SECONDS, Schedulers.newthread ()).doonsubscribe (()->{system.out.println (" subscribe after subscriber2-6s "); }) .doOnNext(num -> System.out.println("Subscriber2-->" + num)) .subscribe(list2::add); // Delay subscription for 1s, Will subscribe to only 3 of data flow connectableObservable. DelaySubscription (1, TimeUnit. SECONDS, Schedulers.newthread ()).doonsubscribe (()->{system.out.println (" subscribe after 3-1s "); }) .doOnNext(num -> System.out.println("Subscriber3-->" + num)) .subscribe(list3::add); // Execute connect() operatorutils.sleep (2000); System.out.println("Observable 2s triggers connect()"); connectableObservable.connect(); assertEquals(list1, Arrays.asList(1, 2, 3)); assertEquals(list2, Collections.singletonList(3)); assertEquals(list3, Arrays.asList(1, 2, 3)); }Copy the code
And so on, can be achieved with other hot observables related operators, such as refCount, replay, cache, etc., please see the specific code ConnectableOperatorsTest. Java.
Other types of operators
In addition to the seven different types of operators described above, operators of error handling types (such as Retry and retryWhen), backpressure types (such as onBackpressureBuffer), Convert types (such as toList and toMap) are not involved. As well as some pinball diagrams that don’t fully explain the details of the operator itself, please check out this article.
(iii) The code of this paper
All code for this article can be viewed at github.com/geniusmart/…
Currently, marble Diagrams have the following operators:
(4) Concluding remarks
It is better to teach a man to fish than to give him fish. This article focuses on a way to learn RxJava and gain a thorough and in-depth understanding of the operators, summarizing the following key points:
- Use the UT implementation, eliminate the Android dependency, and don’t involve too many testing tricks, and focus on the implementation of the operator.
- Purposefully and rigorously implement inputs and outputs. Each operator, reads marble Diagrams and is implemented in code.
- Marble Diagrams Image from RX Marbles, or RxJava official.
- Marble Diagrams have deeper meanings or details that cannot be fully explained, such as
defer
.retryWhen
For more article implementations. For this part of the explanation, please move to another article:Play defer and retryWhen with UT.
Refer to the article
- rxmarbles.com/
- Reactivex. IO/documentati…
- blog.chengyunfeng.com/?p=983