Rxjava is more and more popular in the industry, but although popular but difficult to understand, xin deficit in front of the big gods open, and the experience of the record handed down. More famous parabola, HI big head ghost and so on, at that time read the gods of the article, feel quite a lot, the deepest experience is: although the Martial arts, but I am not enough internal force, learn the secret book or feel half understand, get this trick, but can not understand the essence. However, after a period of exploration, I also gradually have some experience on learning RXJava, so I wrote it down as my own notes, but also hope to give some learning ideas to children who want to learn RXJava. This article is mainly divided into two parts :(1) how to learn rxjava (2) rxjava practice case

How to learn RXJava, I think we can learn from the following aspects:

(1) Benefits of using RXJava

As the project features increase, the code volume increases and the complexity increases, learning RXJava helps us:

The above two points can standardize the coding habits of the team to improve efficiency, and it is also convenient for us to locate and solve problems. I would like to say one more word about how to improve the efficiency of coding. Personally, I think it is very important to set the coding specification and write the code architecture well. At present, some MVP and Flux modes have emerged to help us set the project specification well.

(2) Understand the basic elements of RXJava

The basic three elements of RXJava are as follows:

  • Observable: The observed
  • Subscriber: observer
  • OnSubscribe: an interface class that connects the observed to the observer. OnSubscribe is a local Observable variable

Official is not easy to understand the concept of the above 😅, how simple to understand this “auspicious sambo” above, I am so understand, a concrete example: build a car, build the car process can be divided into a series of continuous process, is simply: step 1 (chassis), step 2 (plus the wheel) and so on, then: observables: Processing step one, step two, and so on corresponding to “factory one”, “Factory two”, and so on. OnSubscribe: “Workshop 1” corresponding to “Factory 1”, “Workshop 2” corresponding to “Factory 2” Subscriber: the place to obtain the finished product of the car.

We can break down a task into subtasks that are executed sequentially, and this kind of organization is where RXJava comes in handy.

(3) RXJava operator use

In section (1) and (2), I introduced the concept of RxJava to give you a sense of what rxJava is, but to really learn rxJava you have to start with operators. So what are operators? In simple terms, operators are Observable operations, such as create, transform, filter, etc. It should be emphasized that an Observable gets a new Observable after operating on its operators. Each operator created simply creates a subtask, which will be covered in the source code analysis later.

Here’s an example of code that creates the Create operator: the code results in a simple output of “0,1,2” followed by “hello rxjava execute complete.”

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribe(new Subscriber() { @Override public void onCompleted() { Log.i(Log.TAG,"hello rxjava execute complete"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i(Log.TAG,""+integer); }});Copy the code

Observable implements subscriptions through subscribe and Subscriber methods.

  Observable.subscribe(Subscriber subscriber)Copy the code

In the example, the Observable uses the create operator to create a container that outputs “0,1,2.” that is, it creates an Observable. The real data processing in this container is in the Call (Subscriber) method of the interface variable OnSubscribe in the Observable, The subscriber in the call method is the subscriber object to which the Observable subscribes. The final output is actually the onNext(T),onCompleted() and onError(Throwable E) methods of the Subscriber callback (error handling).

Here’s another example of code that creates the from operator: the code results in a simple output of “1,2,3”

The Integer [] item = {1, 2, 3}; Observable.from(item).subscribe(new Action1() { @Override public void call(Integer integer) { Log.i(Log.TAG, "" + integer); }});Copy the code

From is passed in an Iterable or array, and then the call method iterates through the output of the Iterable or array. Look carefully, subscribe is passed in an Action1 object, but no subscriber object is found. It’s still an Observable subscribing to a Subscriber, but it’s an ActionSubscriber.

public final Subscription subscribe(final Action1 onNext) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    }

    Action1 onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
    Action0 onCompleted = Actions.empty();
    return subscribe(new ActionSubscriber(onNext, onError, onCompleted));
}Copy the code

Sometimes we only need to listen on Subscriber’s onNext(T) method without onCompleted() and onError(Throwable E) method, so we can directly pass the Action1 object. In fact, you can also implement those three methods by passing Action. 😊

The Integer [] item = {1, 2, 3}; Observable.from(item).subscribe(new Action1() { @Override public void call(Integer integer) { Log.i(Log.TAG, "form " + integer); } }, new Action1() { @Override public void call(Throwable throwable) { Log.e(Log.TAG, "error: "+throwable.getMessage()); } }, new Action0() { @Override public void call() { Log.i(Log.TAG,"from complete"); }});Copy the code

The creation operators also include Interval, Range, and so on, which are not described here.

A more complex example is the transform operator, which is very important in RXJava and one of the more commonly used operators. Take a concrete example of the map transformation operator: the code implements the numbers “0,1,2”, printing true if they are even numbers and false otherwise.

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).map(new Func1() { @Override public Boolean call(Integer integer) { return (integer%2)==0; } }).subscribe(new Action1() { @Override public void call(Boolean aBoolean) { Log.i("map",String.valueOf(aBoolean)); }});Copy the code

The output of this code is:




The output

From the above example, map converts Integer numbers into Boolean. Isn’t that powerful 😀? In our example we saw a Func1 class, and Action1, and actually Actionx and Func1x are used differently: Funcx handles intermediate transformations, encapsulating a return value method with Actionx representing output and no return value, and is often used instead of onNext(T),onCompleted(), or onError(Throwable e) methods, as mentioned above.

Stay tuned for more operators in my next series of articles…

Since maps are so useful, let’s dig a little deeper. How does a map “transform”? I plan to explain the map transformation process in the following steps (PS: the illustrated code is RXJava version 1.1.6) :

  1. Find the starting point for code execution
  2. The relationship between operator map and “Auspicious Three treasures”
  3. How does the map operator implement the transformation operation

1. Find the starting point of your code’s processing logic

To find the trigger point of the code, simply find where the code begins to execute. For example, to Create the symbol Create:

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribe(new Subscriber() { @Override public void onCompleted() { Log.i(Log.TAG,"hello rxjava execute complete"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i(Log.TAG,""+integer); }});Copy the code

We all know that we start by calling a for loop on call(Subscriber), but here’s the question: where is the call method called? The Observable creates an Observeble object using the static create method. The Observable transfers Subscriber to the object using the Subscribe method, as follows:

  public final Subscription subscribe(Subscriber subscriber) {
    return Observable.subscribe(subscriber, this);
}Copy the code

We continue tracking the code observable. subscribe(subscriber, this) method…

Static Subscription (Subscriber, Observable Observable) {// omit other code, // Allow the hook to intercept and/or decorate hook. OnSubscribeStart (Observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); }Copy the code

Let’s ignore hook for the moment. Here we can see the call method. This is the starting point of the real processing logic, and the relationship between Observable and Subscriber is also established. The approximate process of the example is: The code calls the call method of onSubscribe –> executes the for loop –> sends the result of the incoming Subscriber through call(Subscriber) –> subscribes to the desired result in the onNext method of the Subscriber.

2. The relationship between operator Map and “Auspicious Three Treasures”

According to the above case, we find that completing an operation with the create operator involves a call to the “auspicious three treasure”, denoted by Observable0, Subscriber0, OnSubscribe0, What is the connection between create and Map? Map operation also involves “three auspicious treasures”, which can be temporarily represented by Observable1, Subscriber1 and OnSubscribe1.

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).map(new Func1() { @Override public Boolean call(Integer integer) { return (integer%2)==0; } }).subscribe(new Action1() { @Override public void call(Boolean aBoolean) { Log.i("map",String.valueOf(aBoolean)); }});Copy the code

Observable1, Observable1, Observable1, Observable1, Observable1, Observable1, Observable1, Observable1, Observable1, Observable1

  public final  Observable map(Func1 func) {
    return lift(new OperatorMap(func));
}

  public final  Observable lift(final Operator operator) {
    return new Observable(new OnSubscribeLift(onSubscribe, operator));
}Copy the code

So what we see here is that we have an OnSubscribeLift object, OnSubscribe1, and the thing to notice here is that when you build the OnSubscribeLift object you pass in onSubscribe0, So OnSubscribe1 you can call OnSubscribe0. Now that Observable1 and OnSubscribe1 are found, and then Subscriber1 is left, we can go back to the Map method, find the OperatorMap class and trace it in, and we can find an inner class called MapSubscriber, and needless to say, MapSubscriber is Subscriber1☺️. So far, all Observable1 (returned by map method), OnSubscribe1 (OnSubscribeLift) and Subscriber1 (MapSubscriber) involved in Map have been found. So far, it can be summarized as follows: The Observable0 operation generates a new “auspicious threefold” through the operator Map, that is, Observable1, OnSubscribe1, Subscriber1.

3. Implement the MAP operation

By analyzing the relationship between MAP and “Three auspicious treasures”, we get that there are currently two groups of “three auspicious treasures”.

  1. By using the operator Crate for Observable0, OnSubscribe0, Subscriber0.
  2. Observable1, OnSubscribe1, and Subscriber1 are generated by the operator.

The point here is that Observable0 and Subscriber0, and Observable1 and Subscriber1 do not yet have a subscription relationship.

By analyzing map transformation operations, we first find the starting point of processing the code logic, that is, find the call method corresponding to OnSubscribe. Then is the call method corresponding to OnSubscribe0 or OnSubscribe1? Let’s look at the example code again.

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).map(new Func1() { @Override public Boolean call(Integer integer) { return (integer%2)==0; } }).subscribe(new Action1() { @Override public void call(Boolean aBoolean) { Log.i("map",String.valueOf(aBoolean)); }});Copy the code

Observable0 subscribes to the Map method and Observable1 subscribes to the Subscribe 0 method, so the first code to execute is the OnSubscribe1 call method. Call method for OnSubscribeLift. Let’s look at the corresponding code for OnSubscribeLift:

public final class OnSubscribeLift implements OnSubscribe { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe parent; final Operator operator; public OnSubscribeLift(OnSubscribe parent, Operator operator) { this.parent = parent; this.operator = operator; } @override public void call(Subscriber o) {// Reserve only the key code Subscriber st = hook.onlift (operator).call(o); parent.call(st); }}Copy the code

In the call method of OnSubscribeLift, we first locate the hook. OnLift (operator). Call (o) code, which corresponds to the OperatorMap call method, as follows:

public Subscriber call(final Subscriber o) {
    MapSubscriber parent = new MapSubscriber(o, transformer);
    o.add(parent);
    return parent;
}Copy the code

You can obviously see that this method returns the Subscriber1 (MapSubscriber) object, st is the Subscriber1, and then locate the code parent. Call (st). This code implements both Observable0 and Subscriber1 (MapSubscriber) subscribes via the OnSubscribe0 call method, continuing with the parent.call(st) method: OnSubscribe0 call method, so we’re finally back to the for loop. 😆

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); }})Copy the code

Call subscriber.onNext(I) in the for loop, actually the subscriber is subscriber1 (MapSubscriber), let’s look at the onNext method of MapSubscriber…

static final class MapSubscriber extends Subscriber { final Subscriber actual; final Func1 mapper; public MapSubscriber(Subscriber actual, Func1 mapper) { this.actual = actual; this.mapper = mapper; } @override public void onNext(T T) {R result; result = mapper.call(t); actual.onNext(result); }}Copy the code

Result = mapper.call(t); result = mapper.call(t) OnNext (result) sends the converted result through actual.onNext(result), which is actually called Subscriber0, as shown in the MapSubscriber constructor. Finally, summarize the whole process. 😊

  1. Observable1 subscribes to Subscriber0, that is, calls the Call method OnSubscribe1(OnSubscribeLift) to start the logical processing;
  2. In the call method of OnSubscribe1, OnSubscribe0 calls Call (T T) to realize the subscription of Observable0 and Subscriber1(MapSubscriber). Because the call method is called, the for loop is actually called in the example.
  3. In the for loop, the onNext method of Subscriber1(MapSubscriber) is called to send data;
  4. In the onNext method of Subscriber1(MapSubscriber), data conversion of operator Map is achieved through result = mapper.call(t).
  5. Finally, the onNext method of Subscriber0(ActionSubscriber) is called in the onNext method of Subscriber1(MapSubscriber) to call the result back to Subscriber0(ActionSubscriber).

Here is another picture to illustrate the relationship between the two groups of “three auspicious treasures” :




The diagram

Of course, operators can not only these, but the implementation principle can refer to the above map analysis steps, find the starting point, find the corresponding “auspicious three treasures”, and then according to their own understanding, I believe that we will understand the internal principle of the operator, more operators can operate my operator series of articles.

(3) RXJava thread scheduling

In addition to the use of operators, thread scheduling is a cool feature of RXJava. In simple terms, thread scheduling is to specify the operator operation on the thread task, which is implemented through the Schedulers class.

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()).subscribe(new Action1() { @Override public void call(Integer integer) { Log.i("test",String.valueOf(integer)); }});Copy the code

Schedulers.io() is a scheduler that assigns the Create operation to a child thread. SubscribeOn’s incoming scheduler is variable. The types and functions of the specific scheduler can be seen in the following chart:




Type of scheduler

With all the schedulers mentioned above, it’s easy to guess how RXJava implements thread scheduling by using thread pools. So how does that work with thread pools? In fact, back to the “auspicious three treasures”, the subscribeOn method returns a new Observable. Thread scheduling occurs in the call method of OnSubscribe, which is not expanded here. Are there other ways to implement thread scheduling besides the subscribeOn method? The e answer is yes, through the observeOn method can implement thread scheduling, also see the example.

Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()).map(new Func1() { @Override public Boolean call(Integer integer) { return (integer%2)==0; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() { @Override public void call(Boolean aBoolean) { Log.i("map",String.valueOf(aBoolean)); }});Copy the code

We locate observeOn (AndroidSchedulers mainThread ()) the code, first introduce AndroidSchedulers. MainThread (), AndroidSchedulers. MainThread () is a scheduler, it’s different from the scheduler above is that it is special for Android said above scheduling threads on the main thread, that is, the UI thread, SubscribeOn method above can also be introduced to AndroidSchedulers. MainThread () the scheduler. Both the subscribeOn and observeOn methods implement thread scheduling, so what’s the difference?

(1) subscribeOn method only needs to be called once in the whole process, even if called many times, only the first subscribeOn method is effective; (2) subscribeOn will be effective for the operations before and after it, specifically for the call method of create and the call method of map operator; ObserveOn only works for subsequent operations, which specify the Call (Boolean aBoolean) method to work on the UI thread;

Take a look at the illustration diagram of parabola when subscribeOn and observeOn are mixed:




To quote the parabolic god again on the above illustration.

There are five actions on events in the diagram. As can be seen from the figure, ① and ② are affected by the first subscribeOn() and run in the red thread; ③ and ④ are affected by the first observeOn() and run on the green thread; ⑤ is affected by the second onserveOn() and runs on the purple thread; While for the second subscribeOn(), the thread is truncated by the first subscribeOn() in the notification process, so it has no impact on the whole process. This answers the previous question: When multiple subscribeOn() are used, only the first subscribeOn() plays a role.

Finally,, a summary of operators and thread scheduling. The beauty of RXJava is the ability to divide a large function into a series of small operations that can be executed in a thread pool specified by the scheduler.

With all that said, let’s use some examples from Android projects to demonstrate the effects of RXJava.

(1) The child thread is time-consuming, and the main thread updates the UI

Very common functions in the project, first paste the implementation code:

Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { Logger. I (" Perform time-consuming operation...." ); try { Thread.sleep(5000); Subscriber. OnNext (" Time-consuming operation completed..." ); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() { @Override public  void call(String s) { Logger.i(s); }});Copy the code

Then paste the output result:




The implementation function is mainly used through the combination of subscribeOn and observeOn, and the red box in the result is the corresponding worker thread.

(2) Imitation of AsnycTack implementation

AsyncTask is easy to implement (1), and it takes a step further. The onPreExecute() method allows me to do some preparatory operations before performing time-consuming tasks, such as: Display the “chrysanthemum” turn in the load, and finally hide the “chrysanthemum” in the onPostExecute method, as shown in the following figure.




How does rXJava implement these functions? Continue to post code..

@OnClick(R.id.btn2) void onButton2Click(){ unBindSubscription(); subscription= Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { Logger.i("doInBackground()...." ); try { Thread.sleep(5000); Subscriber. OnNext (" Time-consuming operation completed..." ); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); Logger.i("onPreExecute() ...." ); } }).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() { @Override public void call(String s) { Logger.i("onPostExecute .... ".concat(s)); progressBar.setVisibility(View.GONE); }}); }Copy the code

The doOnSubscribe method is similar to the onPostExecute method that implements AsyncTask, but with the following note: DoOnSubscribe is run on the thread that is affected by the most recent subscribeOn method, and by default is run on the thread that subscribes (). The rest of the code is relatively simple, so you can see for yourself.

(3) Unsubscribe

What if we want to stop a task in RXJava? The process of subscribing to an Observable is the process of processing the task, so stopping the task is the unsubscribing operation. How do I unsubscribe? As mentioned in the AsyncTask example, the answer is to call the Subscription unsubscribe method. What is Subscription? This is a code name for a task to be executed.

Subscription subscription=Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber) { for(int i=0; i<3; i++){ subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribe(new Action1() { @Override public void call(Integer integer) { } });Copy the code

It is clear that the entire task process returns a Subscription😑. Here’s another question, why unsubscribe? Some say it’s not nonsense. It’s all been said. Stop the mission!! Yes, that’s one of the reasons. According to my own understanding, I summarize the following:

The first point: Stop tasks, which I think is better than AsyncTask, because we all know that stopping events in doInbackgroud is hard to control, but RxJava simply breaks the logic in doInbackgroud into small pieces of logic. If you unsubscribe, The logic will never be executed again, which is why RXJava is so much better than AsyncTask. The second point is to prevent memory leaks. It is easy to understand that rXJava can refer to an Activity Context in its processing logic. Third point: in some cases we want to prevent tasks from repeating, so we can unsubscribe, stop the execution of the previous task, and then execute the new task.

(4) Filter input with rxBinding

Rxbinding: a new feature from JakeWharton designed to prevent user errors such as typing in a segmentation search, clicking on a string, and so on, as shown in the image below.




Post the key code again:

RxTextView.textChanges(editText).debounce(500, TimeUnit.MILLISECONDS).observeOn(Schedulers.io()).map(new Func1>() { @Override public List call(CharSequence charSequence) { try { if(TextUtils.isEmpty(charSequence)){ return null; } List stringList =new ArrayList(); String key=charSequence.toString(); for (String num: baseDatas){ if(num.contains(key)){ stringList.add(num); } } return stringList; }catch (Exception e){ e.printStackTrace(); } return null; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1>() { @Override public void call(List list) { updateData(list); } }, new Action1() { @Override public void call(Throwable throwable) { Logger.e(Log.getStackTraceString(throwable)); }});Copy the code

The code is relatively simple, but it needs to be noted that: if the above is page 1, the code needs to be called again when page 2 is opened on page 1, and then page 2 is closed and returned to page 1, so the code is usually executed in the onStart method.

Finally, to summarize: in this article, through “learning rxjava benefits”, introducing and sharing learning rxjava, must first understand its three elements, followed by operation in accordance with the study and thread scheduling, at last, some project examples of actual combat and share some experience with, but no matter how much or practice more code, believe that you will have the harvest. Due to my limited level, there may be a lot of deficiencies in the article, I hope you can understand and put forward, and finally hope that this article can give those who want to learn RXJava partners some help 😊.

Appendix: Article Demo

Acknowledgments: Awesome RxJava for Android developers