preface

RxKotlin: a library that uses observable sequences on the JVM to compose asynchronous, event-based programs.

I understand rxKotlin is a library to achieve asynchronous operations, Android development process will use a lot of asynchronous operations, this responsive programming way can make the program readable, clear thinking, so that developers can do better code maintenance.


Why is RxKotlin and the Kotlin language recommended

Kotlin is a jVA-BASED programming language developed by JetBrains. In Google IO 2017, Kotlin became the official Language for Android development.

In our daily Android development, too many businesses require asynchronous operations, either starting a new thread or cutting back to the main thread. After the huge business, looking at the business code he wrote before, easy to face meng force +_+, if write comments estimate can better point, if did not write comments at that time, estimate will instantly explode 💥.

Here are some examples I wrote in Java and Kotlin: Read PNG images from a folder and do something with them on the UI. The first post was written by me in Java.. I might have written this before… Sometimes writing code just flies off the screen

final List<File> folders = new ArrayList<>(); new Thread(){ @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { Bitmap bitmap = BitmapFactory.decodeFile(file.getAbsolutePath()); RunOnUiThread (new Runnable() {@override public void run() {// handle bitmap}}); } } } } }.start();Copy the code

Next is code for the same function in rxKotlin

Observable.from(folders) .flatMap { Observable.from(it.listFiles()) } .filter { it.name.endsWith(".png") } .map { BitmapFactory.decodeFile(it.absolutePath) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe {// handle bitmap}Copy the code

How’s that? Does it feel like it becomes logically clear all of a sudden? That’s why I’m a big fan of Kotlin and responsive programming. One may learn a skill, he can rely on that skill for a lifetime of food, but if he constantly update or strengthen the skill, he may soon be able to eat meat.


The principle of analytic

1. Core: Observer mode

The asynchronous implementation of the library is implemented through the extended observer pattern. What is the observer model? Let me give you a simple example.


Observer mode for button click events

Here, Button acts as an observed object, and the observer OnClickListenner listens on its click event. When the button is clicked, an OnClick message event is fired and passed to the OnClickListener. To define the click event in observer mode, I would say: The observer OnClickListener subscribes to the observed Button via setOnClickListener(). Once the Button is clicked, it changes back to OnClick. In other words, there is the following correspondence: Button –> Observed OnclickListener –> Observer setOnClickListener() –> subscribe OnClick() –> event

2. Implementation method

If you are interested in rxJava, you can go to some articles to see their introduction and usage. There are many articles that introduce their basic usage.

1) Create an Observer

Create an observer observer that determines what action to take when the event arrives.

var observer = object : Observer<String> { override fun onNext(t: String?) { Log.e("name",t) } override fun onError(e: Throwable?) { Log.e("error",e.toString()) } override fun onCompleted() { Log.e("status","completed!!" )}}Copy the code

It has three drop methods that define what to do in response to an event when it arrives. As each event arrives, it would normally be responded to in the onNext() callback. If all events are completed, onCompleted() is called.

2) create observables

Observable is observed and determines what events are triggered and the rules by which the events are triggered.

  var observable = Observable.create<String> {
            it.onStart()
            it.onNext("jiangyu")
            it.onNext("jy")
            it.onNext("god")
            it.onCompleted()
        }
Copy the code

Here, I create three events and send “Jiangyu “,”jy” and” God “in sequence, and then call onCompleted() to mark the completion of the event.

3) Subscribe

Given the observer and the observed, use subscribe to connect the two. That is:

observable.subscribe(observer)
Copy the code

See, there’s a logical inversion here, maybe a little bit different from the thinking. Obviously the logic here is that the observed subscribes to the observer, so why go against the conventional wisdom? Here’s how the streaming API works

3. How API works

I’m going to start with a screenshot of what I just did


Run a screenshot

Think carefully about the observer and observed that you just created and see what they did. I want to summarize: 1. First the observer defines the event that occurs and the order in which the event occurs (the order in which the string is sent here) 2. As each event arrives, the observer processes the incoming event (in this case, printing a log)

The above code and logic analysis can be written as an equivalent code fragment (except that onCompleted was not overwritten) to facilitate understanding of the whole process:

   Observable
                .just("jiangyu", "jy", "god")
                .subscribe {
                    Log.e("name",it)
                }
Copy the code

This is a more concise way of writing the API, and there are various interpretations on the web of this streaming operation, which some say is like a transmitter, firing events one by one and then processing them.

To further demonstrate how this works, I have altered the code as follows:

 Observable
                .just("jiangyu", "jy", "god","000")
                .filter {
                    Log.e("start with j", it)
                    it.startsWith("j")
                }
                .map {
                    Log.e("name", it)
                    it.toUpperCase()
                }
                .subscribe {
                    Log.e("after map name", it.toString())
                }
Copy the code


The test results

It turns out that not all elements perform filter filtering and then map mapping, but that events or elements on the stream move vertically along the chain. This is exactly what was said earlier, that this is like a transmitter, firing off events one by one and processing them one by one.


The thread of control

Finally, the principle of the multi-observer mode is explained, and now it is time to get down to business! Let’s explore how rxKotlin reactive programming is used in Andorid. The most important thing in Android development is switching between threads. Therefore, knowing how Scheduler (thread controller) works and how to use it, I think it is possible to use rxKotlin to control thunder and Lightning in Android development ⚡️

Here are some of the schedulers that come with common apis:

  • Schedulers.newthread (): New threads are always enabled and operations are performed
  • Schedulers.io() is used to read and write files, make network requests, etc. It is similar to newThread(), but uses an unlimited thread pool and can reuse idle threads.
  • AndroidSchedulers. MainThread () : the specified in the main thread to run

So how do we control threads? Let’s start with the following code:

@GET("getUsers") fun getUsers(@Query("token") token: String):Array<User> val retrofit = Retrofit .Builder() .client(OkHttpClient()) .addConverterFactory(GsonConverterFactory.create()) .build() val userService = retrofit.create(UserService::class.java) Observable. From (userservice.getUsers ("token"))// IO thread.subscribeon (schedulers.io ()).observeon (schedulers.io ()).filter 18}. {it. Age < observeOn (AndroidSchedulers. MainThread ()). The subscribe {/ / in the UI on the main thread operation}Copy the code

This is combined with using the Retrofit networking framework to give an example of a thread switch chestnut, which is probably a bit rotten…. The main operation is to obtain user information first, then filter to get users younger than 18 years old, and finally complete the operation on the UI. I have simply marked the scope of subsribeOn and observeOn:


scope

A brief summary can be divided into the following three points:

  • SubscribeOn controls the thread on which its previous statements are located
  • ObserveOn controls the thread on which the statements that follow it go to the next observeOn
  • ObserveOn has no effect if you add subscribeOn after it

Switching between threads is easy. What are the powerful principles behind this? Let’s take our time. First, let’s generalize and explain what I think is the key operation in rxKotlin – transformations.

map & flatMap

In fact, I’ve used a lot of the previous code, but personally I think these two brothers are quite important in rxKotlin.

We can think of it as a kind of mapping, but not quite. I think a better explanation is the operation of changing event objects, that is, from one event object to another.

Observable. Just (user1, user2, user3).map {it. Name}. Subscribe {// printCopy the code

This makes sense. I pass in three users, and for each user I return their name through a map, and then print it out.

FlatMap: This concept is a bit tricky to understand and I understand it as a “flatMap” operation. What is a “smooth” operation? Here’s another chestnut 🌰

// data class User(val name: String, val age: Int, val courses: Array<Course>) Observable .from(arrayOf(user1,user2,user3)) .flatMap { Observable.from(it.courses) } .filter { it.classRoom.equals("528") }Copy the code

Once again, the three users are taken as input and processed one by one. For each user, its courses are retrieved in the flatMap operation and instead of being sent, they are activated. It initializes a new Observable, which distributes the courses corresponding to each user again. Hard to understand…… Experience with heart 😄

To better understand how these operations work, let’s take a look at the MAP API:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
Copy the code

The map passes in a function that converts T to R and returns an Observable

using the lift() method. Yes, a function is passed in, which is a higher-order function, that is, the argument passed in or returned is a function. Then let’s look at the LIFT () API:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); }}}); }Copy the code

Observable

returns an OnSubscribe

listen on the subscibe of the Observable

. Only when the subscribe event occurs, Trigger the action inside the call() method in the code above. This puzzle is best explained by thinking back to why events are processed in a vertical direction. Intermediate operations, such as map, act as a template that defines what transformations an event will perform, but do not perform them immediately until they listen to the SUBSCRIBE () method.


Thread switching works in much the same way, using the same core lift() method. For details, see the source code in the API.


conclusion

Kotlin language and rxKotlin, rxJava I am also at the beginning of the stage, also can be considered to find some of the highlights of these new things, research and then research out some of the things summarized and shared. I hope I can understand its principle more deeply in the future projects and learning process.

By the way, whether it’s jumping right into the first paragraph, or accidentally slipping into the middle of a sleep… You relentless ️ bottom left corner ❤️ Thank you 😄😄😄😄😄😄😄😄