preface

This is the second article in the source code series. The previous one was Okhttp, and the next one is planned for Retrofit

What, a simple OkHttp source code analysis?

RxJava became popular a few years ago and is known for its chain-style calls and powerful operators. The most important thing, of course, is that asynchronous switching is obscenely simple. However, every coin has two sides. RxJava is also extremely difficult to get started with, and it is not uncommon to encounter other members of the team who are not very proficient with RxJava.

At the root of all this is its design philosophy, event-driven programming.

What? Look not to understand?

It doesn’t matter, next I will not post a line of source code, so that you can hand lift a RxJava.

Why not post the source code?

To tell the truth, I personally see technical articles, the most hated is a large length of the source, this article I will only feel sick. The source code is there. I won’t read it myself. Okay? So this article refuses to post large pieces of source code, only the simplest code to implement RxJava.

Let you in the future encounter RxJava interview questions, you directly tell him, the interviewer, or so don’t ask, I give you a handwritten RxJava you can see?

Reading this article requires some basic knowledge of RxJava. At least simple to use

Observer model

To understand RxJava, you must understand one pattern, the Observer pattern.

What is the observer model?

You subscribe to a public account. At this point, you are the observer, and the public account is the observed. You subscribe to the public account in short, you and the public account to establish a link, the public account will update the article will be pushed to all subscribers, including you.

Two roles, one relationship.

Observed notifies subscribed observers of changes. Corresponding public number

Observer, actively establishes a subscription relationship with an observed. Corresponding to you

So how do you do that in code?

Simply implement the observer pattern

Create an observer

class Observer {
    fun change(a){
        println("I'm an observer.")}}Copy the code

Create an observed

class Observable {
	// Set of observers
    private val observerList= mutableListOf<Observer>()
	// Subscribe method
    fun subscribe(observer: Observer){
        observerList.add(observer)
    }
    // Notify all observers
    fun notifyObserver(a){
        observerList.forEach {
            it.change()
        }
    }
}
Copy the code

Wait a minute. What are you being watched for?

We’ll talk about what’s inside of this in a minute

fun main(a) {
    // Create an observed
    val observable = Observable()
    // Create an observer
    val observer=Observer()
    // The observed subscribes to the observer
    observable.subscribe(observer)
    // The observed informs the observer that something has changed
    observable.notifyObserver()
}
Copy the code

Looking at the observer mode may seem esoteric, but it is actually a collection of observers within the observed, and subscribing is actually adding observers to the collection.

When there is a new message, the observed traverses the collection of observers and calls their methods.

Observer mode in RxJava

The observer mode in RxJava is simpler, with the above mentioned possibility of having a bunch of observers, whereas in RxJava the observer and the observed are one-to-one. What do you say? Is that a step down?

To better understand event-driven, we can use upstream and downstream instead of observed and observer.

What are upstream and downstream?

Basically, it’s two pipes that are connected in a certain way, and the water in the pipes flows from the top pipe to the bottom pipe.

That is to say, the pipes above are responsible for running water, and the pipes below are responsible for receiving water. Water is the event

That’s how we use it

// Create the water pipe above
Observable.create<Int> { emitter ->
   	// Let the water flow out
    emitter.onNext(10)}// Connect two water pipes
	.subscribe (
     // Create the following water pipe
     { item ->   
      	// Print the water down
    	println(item)
     }
)
Copy the code

This is the usual way to use it, which is to create an upstream, and then upstream sends the event, and downstream receives it.

There are other ways to use it, such as switching threads or doing other things to data.

Observable.create<Int> { emitter ->
    emitter.onNext(10)
}
    .map{} // Convert the data
    .subscribeOn()// Change the thread sent upstream
    .subscribe{}
Copy the code

These are the operators that RxJava prides itself on, which are essentially operations between upstream and downstream. By the end of this article you will understand the nature of operators, and you can even write some of your own.

So without further ado, let’s begin a thorough dissection of RxJava to see how it actually operates. Don’t be afraid, there is no source code, I will only use the simplest code to achieve a complete function of RxJava, let you thoroughly understand the RxJava operator, and RxJava thread switch.

Hand rolled RxJava

Starting from the ground up, we know that RxJava first requires two classes, one observer and one observed.

Let’s define these two classes

Define the observed and the observer

// Observed
class MlxObservable{}
Copy the code
/ / observer
interface MlxObserver{}
Copy the code

The observer is not a class, but an interface.

Oh, cheating was found, hurriedly escape ~

In fact, not to deceive you, you think, we usually in the use of RxJava, is how to use? In Java, you need to create an anonymous inner class. In Kotlin, you need to define an Object interface. Yes, the observer uses interfaces.

Why would you do that?

RxJava doesn’t know what changes you need to make, so make it an interface and implement it yourself.

RxJava doesn’t know what data I want to send. Like your code above, doesn’t the observed implement an interface themselves?

Congratulations, you can answer the question!

Yes, the observed should also be an interface and let developers implement what data to send. RxJava does just that, but instead of calling the interface Observable, it calls it ObservableOnSubscribe, for the sake of uniformity.

I personally prefer to call the ObservableOnSubscribe the real observed.

Define a true observed:

// The real observed
interface MlxObservableOnSubscribe{}
Copy the code

The observed has it, the observer has it, and then we need to think, what do we do next?

The first thing we can be sure of is that interfaces alone won’t work, we need to define methods to determine what each interface should do.

Then we should think about the responsibilities of these two interfaces.

Starting with the observed, the observed should have the ability to send data downstream. What do you mean by the ability to send data downstream?

In fact, many people will talk about very mysterious and mysterious, in fact, holding the downstream reference, calling the downstream method, in short, callback.

Define upstream methods

Let’s define a method like this:

interface MlxObservableOnSubscribe{
    fun setObserver(observer:MlxObserver)// Set the downstream
}
Copy the code

Regardless of how to add the downstream to the upstream, the developer now only needs to implement the observed interface, and then there will be a reference to the downstream, and the downstream methods can be called.

Define downstream methods

Then you have to think about what the downstream methods are. For those of you familiar with RxJava, we override four methods when implementing observer: onSubscribe, onNext, onError, and onComplete. Ok, let’s emulate RxJava today and define these four methods as well:

interface MlxObserver<T> {
    fun onSubscribe(a)
    fun onNext(item:T)
    fun onError(e:Throwable)
    fun onComplete(a)
}
Copy the code

Maybe some friends don’t understand this

very well. So this is a generic, so the data that onNext sends could be a String, could be an Int, and we don’t know which type it is, so we define a generic, which is kind of a placeholder, and all we need to do is specify what type it is when we call it, and onNext will receive that type of data.

Now that generics have been defined downstream, upstream needs to change accordingly, adding generics.

interface MlxObservableOnSubscribe<T>{
    fun setObserver(observer:MlxObserver)// Set the downstream
}
Copy the code

In this way, once you implement the upstream interface, you get the downstream object and call the four downstream methods. In fact, you can see that upstream, downstream is actually bullshit, it’s just setting a callback. It’s not that hard.

Now we have upstream, we have downstream, so how do we set up a downstream for upstream?

Remember that fake Observable we had earlier, yeah, yeah, that’s an Observable. We can use it to do this amazing thing where the observed observer goes all the way down.

RxJava uses static methods to do upstream creation, so let’s do a static one as well.

Receiving upstream objects

class MlxObservable{
    Static methods create a real observed
	companion object{
        fun <T> create (source:MlxObservableOnSubscribe<T>):MlxObservable{
            return MlxObservable()
        }
    }
}
Copy the code

Taking a closer look at the code, we have created a static method to receive an actual observed. Returns a false observed.

Why return a false observed? This is because RxJava’s subsequent methods are no longer static, so we need to get an object. But the more important reason is that RxJava uses decorator mode, which allows for better functionality extensions, including various operators. I won’t go into what the Decorator pattern is and how to implement it, but by the end of this article you should have an idea.

Now we continue the analysis, we now return a false observer, we need to use this false observer as a platform to set the downstream to the upstream.

Receiving downstream objects

So we might as well set a method for the fake observed, but it is important to note that since we return a class object, we can not define static methods.

class MlxObservable{...// This receives a downstream object,
	fun setObserver(downStream: MlxObserver<T>){}}Copy the code

So the question is, yes, downstream, what about upstream?

Well, that’s a problem. The create method does set a upstream, but we don’t have variables to save. So we need to define a variable to hold upstream.

private  varsource:MlxObservableOnSubscribe<T>? =null
Copy the code

Also, it is best to get upstream in the constructor:

class MlxObservable<T>  constructor() {// Upstream objects
   private  varsource:MlxObservableOnSubscribe<T>? =null
   // subconstructor, used to receive upstream objects
   constructor(source:MlxObservableOnSubscribe<T>):this() {this.source=source
   }
}
Copy the code

In this case, our create method should also be modified:

fun <T> create (emitter:MlxObservableOnSubscribe<T>):MlxObservable<T>{
    return MlxObservable(emitter)
}
Copy the code

Connecting upstream and downstream

In this way, upstream and downstream, what should we do? Of course it is! The upstream must have been created when we received the downstream object, so we can connect the upstream and downstream directly from here, in fact, set the downstream to the upstream

class MlxObservable<T>{...// This receives a downstream object,
	fun setObserver(downStream: MlxObserver<T>){ source? .setObserver(downStream) } }Copy the code

With that, we are done with RxJava.

What?? It’s done. I’m afraid you’re not lying to me, Kitty.

Don’t believe it? Let’s take a run

MlxObservable.create(object :MlxObservableOnSubscribe<Int> {override fun setObserver(emitter: MlxObserver<Int>) {
        println("Upstream sending data :10")
        emitter.onNext(10)
    }
})
    .setObserver(object :MlxObserver<Int> {override fun onSubscribe(a) {
			println("onSubscribe")}override fun onNext(item: Int) {
            println("Downstream receives data:$item")}override fun onError(e: Throwable) {}
        override fun onComplete(a){}})Copy the code

The result is this:

How to? Is the above code style already RxJava style? It’s just that the variable name is a little bit ugly, I subscribe method name, I’m setObserver method name, but it doesn’t affect use. And we’re going to change that later, but I’m going to do that for now just to make sense of it.

Someone might say, well, you didn’t call the onSubscribe method. Yes, but this method is called at subscription time by definition, whether you send data or not. Based on this, we can conclude that we should call the downstream method before setting it up. The false observer calls the onSubscribe method immediately after receiving the downstream:

class MlxObservable<T>{...// This receives a downstream object,
	fun setObserver(downStream: MlxObserver<T>){ downStream.onSubscribe() source? .setObserver(downStream) } }Copy the code

That will do. The simplest version of RxJava is declared complete.

Here is a simple model

Process steps

  1. Create a real observed object using the fake observed static method create, then set it to the fake observed sourece object
  2. Call the fake observed setObserver method to create an observer object, immediately call the observer’s onSubscribe method, and then set the observer to the observed
  3. The real observer calls the observer’s method.
  4. Observer receives data

The actual observed in the figure above has two rectangles, which are actually one object, but I just pulled it out to make it better represented.

But no, RxJava is obviously so powerful, all kinds of operators, thread switch you this can not ah.

That’s right. Let’s do an operator first. Emmm, which operator?

Let’s use my most commonly used map to make it more representative.

Custom Map operators

Let’s keep thinking, if we define the Map operator, what does it do?

It is obvious that the map operator operates between upstream and downstream, in the middle. And that’s exactly what the decorator pattern does, which is enhance objects.

We can do this in a way, since it’s between upstream and downstream, can we take the upstream water, do the desired transformation, and then release the transformed water downstream?

In RxJava, the create method is followed by the Map method, which is obviously a class method, not a static method, so we define the Map method first. However, we should take into account that the map method can still communicate with the downstream completely unchanged, so the map method must return a false observed object.

With these methods we can write code like this:

class MlxObservable<T>{
	fun map(a):MlxObservable{}
}
Copy the code

However, MlxObservable, or fake observable, is also generic. But should it be of type T?

The answer is no, why not? The first thing we need to know is what a map does, and it converts, for example, a string that you can convert downstream into an Int. So we can’t use T anymore, and we don’t know what type to convert, so we’ll define an R type again.

So the code looks like this:

class MlxObservable<T>{
	fun <R> map(a):MlxObservable<R>{}
}
Copy the code

So now that we have it, how do we do the transformation?

Of course RxJava doesn’t know how you want to convert, so RxJava also defines an interface that lets you implement the interface and use it as a downstream type based on its return type.

Don’t understand?

That is, you have to define the rules for the conversion, and you know what type R is based on the type of your return. Java interfaces are too cumbersome, and we can do this using Kotlin’s higher-order functions.

In map we pass in a higher-order function as a transformation rule:

fun <R> map(func:(T) - >R):MlxObservable<R>{
    
}
Copy the code

Func :(T)->R is a higher-order function that is essentially the same as Java’s single-method interface. The argument func is a function whose argument is of type T and return value is of type R. It’s that simple.

OK, now that we have the transformation rules, how do we apply the transformation and send the converted data downstream?

In fact, very simple, familiar with the decorator pattern of small partners should already know, do not know it does not matter.

The next thing we need to do is define a map of its own true observed, which is used to carry upstream and notify downstream. It doesn’t matter if you don’t understand. Let’s do it this way. You’ll see.

class MlxMapObservable <T, R>():MlxObservableOnSubscribe<R>{}
Copy the code

You might wonder why you define two generics. Remember in the map method we defined an R type again? Why define that R type? Because we already have a T type, we need an indeterminate type, so we define it as R.

Type T is the type of upstream emission, and type R is the type to be converted.

This class inherits the MlxObservableOnSubscribe interface, which is the real observed. What is the purpose of this?

The map is between upstream and downstream, so it needs to carry on upstream and transfer data to downstream. Therefore, it makes itself a real observed, and the downstream observes it, and then the upstream observes it. After the upstream sends the data, the Map first receives the data, and then applies the transformation rules to transfer the transformed data to the downstream, that is, calls the methods of the downstream.

Don’t get it?

In the previous model, the downstream set itself directly to the upstream, while the upstream called the downstream methods directly.

Downstream of the model, now is set himself to the map, the map will be set to the upstream, is still called downstream approach, but at the moment of the upstream downstream is no longer true downstream, and become the map, when invoked the downstream method is called the method of the map, map method after receive the message, the application of conversion, Then call the real downstream again.

If I say too much, you might not understand it, but if I write it down, you’ll understand it immediately.

Since MlxMapObservable implements the MlxObservableOnSubscribe interface, it should

class MlxMapObservable <T, R>():MlxObservableOnSubscribe<R>{

    override fun setObserver(downStream: MlxObserver<R>){
        // The downStream is actually downStream}}Copy the code

Now that we have downstream, shouldn’t we also have upstream and transition rules?

Said is right

Let’s write these two in the constructor at this point:

class MlxMapObservable <T, R>(
    	private val source:MlxObservableOnSubscribe<T>, 
    	private val func:((T)->R)
	):MlxObservableOnSubscribe<R>{

    override fun setObserver(downStream: MlxObserver<R>){
        // The downStream is actually downStream}}Copy the code

At this point, we have the upstream source, the transformation rule func, and the downStream. It’s time to do something. As mentioned earlier, the map should set itself to upstream, but the map is an observed and upstream receives an observer.

Therefore, we need to define an observer in the map to receive data from upstream.

class MlxMapObservable <T, R>(
    	private val source:MlxObservableOnSubscribe<T>, 
    	private val func:((T)->R)
	):MlxObservableOnSubscribe<R>{

    override fun setObserver(downStream: MlxObserver<R>){
        // The downStream is actually downStream
    }
    class MlxMapObserver<T,R>(
        	private val downStream:MlxObserver<R>, 
        	private val func:((T)->R)
    	):MlxObserver<T>{

        override fun onSubscribe(a) {
            downStream.onSubscribe()// When a subscription is received from upstream, the event is passed downstream
        }

        override fun onNext(item: T) {
            // Apply the conversion rule to get the converted data
            val result=func.invoke(item)
            // The converted data is passed downstream
            downStream.onNext(result)
        }

        override fun onError(e: Throwable) {
            // Pass the error downstream
            downStream.onError(e)
        }

        override fun onComplete(a) {
            // Complete the event to pass downstream
            downStream.onComplete()
        }

    }
}
Copy the code

We define an observer object, MlxMapObserver, that belongs only to the Map and, in its constructor, passes the real downStream to it. In the Map observer object, all of its methods will be passed to the real downStream.

It is important to note that in the onNext method, the map method applies the func transformation at this step, converting data of type T to type R and delivering data of type R to the real downStream.

The next thing to do is simply to take on the upstream in the setObserver method and give the upstream its own observer object

override fun subscribe(downStream: MlxObserver<R>){
    val map=MlxMapObserver(downStream,func)// Create your own observer object
    source.subscribe(map)// Pass itself upstream
}
Copy the code

The map is created. Is it easy?

Actually the code is very simple, but logic can be a little round, in general, is to create their own observer object, then their observer object to upstream, send a message to downstream is passed to the map, the map in the observer of a further operations on the data, after the operation after the data is passed to the downstream.

Once the map stuff is created, we go back to the Map method, which we know needs to return a fake observer object, and a fake observer object needs a real observer object. The map is the real observer object, so we simply create a new fake observer and pass all upstream and application rules to the Map, and finally pass the map to the new fake observer.

fun <R> map(func:(T) - >R):MlxObservable<R>{
    // Source is the upstream real observed.
    val map=MlxMapObservable(this.source!! ,func)return MlxObservable(map)
}
Copy the code

Let’s practice it and see if it works in RxJava.

MlxObservable.create(object :MlxObservableOnSubscribe<Int> {override fun setObserver(emitter: MlxObserver<Int>) {
        println("Upstream sending data :10")
        emitter.onNext(10)
    }
})
	.map{ item->
		"Here is the data from the map operator:$item"
	}
    .setObserver(object :MlxObserver<String>{
        override fun onSubscribe(a) {}
        override fun onNext(item: String) {
            println("Downstream receives data:$item")}override fun onError(e: Throwable) {}
        override fun onComplete(a){}})Copy the code

Here are the results:

What do you think? It looks exactly the same.

Maybe some of you didn’t understand it. It doesn’t matter. Let’s look at the model schematic diagram of MAP again.

That is, the map method then returns a new fake observed that contains the real observed constructed by the map.

That is, the map operator constructs an object that has both the observed and the observer, whose object is used to receive downstream and whose observer is used to observe upstream.

Does that make sense? The map operator can be used to implement other operators. The map operator can be used to implement other operators.

Let’s move on to the deepest, most difficult part of RxJava. That’s how RxJava switches threads.

RxJava switches threads

First, RxJava switches threads using two methods, specifying upstream and downstream threads respectively.

Which method thread did you switch upstream? Those of you who have used it should know that the switch is the method implemented in the real observer that we construct. So where is this method being called?

That’s right, it’s fake called by the observer. The setObserver method of the MlxObservable calls the upstream setObserver method.

Now if we want to change the thread of the upstream setObserver method, we can only change it in the fake observed object. Since the method that created the fake observed in the first place is already dead, we can follow the idea of the map operator and construct a fake observed ourselves, which will continue upstream and change threads just like the map.

What? Don’t you understand?

Does MAP take over upstream?

Yes.

How does MAP carry upstream?

The map itself constructs a real observed object and then calls the upstream setObserver method to set itself into it.

You see, as you know, map calls the upstream setObserver method. As we discussed earlier, the only thread that needs to be changed is the one on which the method is located, so we can define a map operator and then continue upstream on another thread.

OK, so let’s see how the code works.

We define an operator that changes the upstream thread. Since we copy RxJava, we will copy its method name subscribeOn.

Changing upstream threads

Define a class, modeled after map, to implement the real observed interface, and define an observer object of its own in order to properly pass data downstream.

class MlxSubscribeObservable <T>(
    val source:MlxObservableOnSubscribe<T>):MlxObservableOnSubscribe<T>{

    override fun setObserver(downStream: MlxObserver<T>){
        val observer=MlxSubscribeObserver(downStream)
    }

    class MlxSubscribeObserver<T>(valdownStream:MlxObserver<T>):MlxObserver<T>{ ... }}Copy the code

Define a member method modeled after map:

fun subscribeOn(a):MlxObservable<T>{
    val subscribe=MlxSubscribeObservable(this.source!!)
    return MlxObservable(subscribe)
}
Copy the code

All right, everything’s fine. But that doesn’t change the upstream thread, even in the setObserver method you’re not doing upstream.

Well, we need to think about how thread creation and destruction can be very costly. We can’t just create a new Thread, that’s a huge performance overhead, so for threading, we need Thread reuse, and Thread reuse should be the first thing to think about using a Thread pool. RxJava does just that.

There’s a problem with RxJava when it comes to thread switching. There’s multiple threads, IO, computation, and the Android main thread.

In fact, except for the Android main thread, all other threads are drawn from the thread pool, the difference is the thread pool strategy is different. For example, IO uses the CachedThreadScheduler thread pool, which is cached, that is, a pool of threads that can be reused. It’s easy to understand, thread pool and thread synchronization and so on, and I’m going to write a series on it, so you can watch it.

So for a variety of different thread operations, we need a class that manages those threads. RxJava does the same, and it’s called Schedulers, so we’ll call it Schedulers

Define thread pool management classes

class Schedulers() {}Copy the code

You can’t just have a class like that, you have to manage the thread pool, and if you want to manage the thread pool you have to have a thread pool. So we’re going to define a thread pool object inside of this, so there are so many thread pools, we’re only going to define one IO thread pool.

class Schedulers() {private var IOThreadPool =Executors.newCachedThreadPool()/ / IO thread pool
}
Copy the code

Main thread?? It’s just a matter of time. Take it easy

IO thread pool, RxJava seems to be static method, you are a class method, it is different ah.

Hey, RxJava is just a policy, we can do a similar, but not as complex as RxJava, simply Int to represent the policy.

class Schedulers() {private var IOThreadPool =Executors.newCachedThreadPool()/ / IO thread pool
    companion object{
        // Define a thread-safe singleton pattern
        private val INSTANCE: Schedulers by 
        	lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
            	Schedulers()
        }
        private val IO=0 // Define the IO thread policy
        private val MAIN=1 // Define the main thread policy}}Copy the code

We have defined a thread-safe singleton pattern, as for how to implement, interested partners can see this for themselves. It’s essentially double check.

Everything is ready, except the east wind.

Now that we have it, we can get to work.

What mission?

Of course, we need to switch tasks on the specified thread, so we need to define a method to switch threads. Now, if you switch threads, that means that the specified thread is running upstream setObserver into a downstream, you see, the parameters are not out, in order to be able to switch, first you have to have an upstream, second you have to have a downstream, and then you have to specify the thread.

Ok, define a method like this:

fun <T> submitSubscribeWork(
	source: MlxObservableOnSubscribe<T>, / / the upstream
	downStream: MlxObserver<T>,/ / downstream
	thread:Int// The specified thread
	) {
        when(thread){
            IO->{
                IOThreadPool.submit {
                    // A thread is extracted from the thread pool to perform upstream and downstream join operations
                    source.setObserver(downStream)
                }
            }
            MAIN->{
                
            }
        }

}
Copy the code

The code looks simple enough. After the upstream and downstream are obtained, the upstream and downstream are connected in the thread pool, that is, the downstream is set to upstream. We’ll leave a main, because main hasn’t been implemented yet, and we’ll implement it in a moment. Now that we have the thread pool and the method, let’s go back to the thread switch operator and submit the thread switch to the thread pool management class.

class MlxSubscribeObservable <T>(
    private val source:MlxObservableOnSubscribe<T>,
    private val thread:Int // See here, the specified thread is added
	):MlxObservableOnSubscribe<T>{

    override fun setObserver(downStream: MlxObserver<T>){
        val observer=MlxSubscribeObserver(downStream)
        // Submit the task to the specified thread, that is, specify the thread to complete the upstream and downstream links
        Schedulers.INSTANCE.submitSubscribeWork(source,downStream,thread)
    }

    class MlxSubscribeObserver<T>(valdownStream:MlxObserver<T>):MlxObserver<T>{ ... }}Copy the code

So the problem is again, can’t just use other threads, what if I just want to send the main thread?

So let’s do the main thread stuff.

Speaking of the main thread, we have to mention one thing, yes, Handler. Handler if you do not know about Handler, I suggest you read my article to understand Handler you chop me, many people say I this article is about Handler the best article ~ hee hee hee, yes many people are me.

So when we switch threads on The Android platform, the one point we never get around is Handler. Coroutines? Coroutines also encapsulate Handler on Android. I can also take you to implement a coroutine manually later. I won’t go into that for today.

So we need to define a Handler, and since this is the main thread, we need to construct a Handler for the main thread:

class Schedulers(){
    ...
    private var handler=Handler(Looper.getMainLooper()){ message->
        // This is the main thread.
        return@Handler true}}Copy the code

Let’s continue to think about how we can set up upstream and downstream code to be passed to handler once we receive the task of switching threads. The answer is message’s callback.

fun <T> submitSubscribeWork(
    source: MlxObservableOnSubscribe<T>, 
    downStream: MlxObserver<T>,
    thread:Int) 
{
    when(thread){
        IO->{
            ...
        }
        MAIN->{
            val message=Message.obtain(it){
                	// Upstream and downstream connections
                    source.subscribe(downStream)
                }
            it.sendMessage(message)
        }
    }
}
Copy the code

This is a runnable interface that connects upstream and downstream. When the handler receives the message, all it needs to do is switch to the main thread:

class Schedulers(){
    ...
    private var handler=Handler(Looper.getMainLooper()){ message->
        // This is the main thread.
 		message.callback.run()
        return@Handler true}}Copy the code

OK, so let’s practice and see if the effect is really as we expected?

MlxObservable.create(MlxObservableOnSubscribeJava<Int> {
    Log.i("zzz"."Upstream thread:${Thread.currentThread().name}")
    it.onNext(10)
})      
    .subscribeOn(Schedulers.IO())
    .subscribe(object : MlxObserver<Int> {
        override fun onNext(item: Int) {
            Log.i("zzz"."Downstream thread:${Thread.currentThread().name}")}... })Copy the code

OK perfect. The upstream thread was successfully switched

But why did the downstream thread also switch? Hey, RxJava works the same way, because you don’t specify the downstream thread, so by default upstream and downstream are the same thread

Let’s do the same for the upstream thread and switch the downstream thread’s real observed and observer, and the method.

Changing downstream threads

First is the downstream observed object:

class MlxObserverObservable<T>(
        private val source: MlxObservableOnSubscribe<T>, 
        private val thread: Int
	) :MlxObservableOnSubscribe<T> {
        
    override fun setObserver(downStream: MlxObserver<T>) {
        val observer = MlxObserverObserver(downStream, thread)
        source.subscribe(observer)
    }
    class MlxObserverObserver<T>(
            val downStream: MlxObserver<T>, 
            val thread: Int
    	) : MlxObserver<T> {

        override fun onSubscribe(a) {
            Schedulers.INSTANCE.submitObserverWork({
                downStream.onSubscribe()
            }, thread)
        }

        override fun onNext(item: T) {
            Schedulers.INSTANCE.submitObserverWork({
                downStream.onNext(item)
            }, thread)
        }

        override fun onError(e: Throwable) {
            Schedulers.INSTANCE.submitObserverWork({
                downStream.onError(e)
            }, thread)
        }

        override fun onComplete(a) {
            Schedulers.INSTANCE.submitObserverWork({
                downStream.onComplete()
            }, thread)
        }
    }
}
Copy the code

Ignore the name of my garbage. Unlike changing the upstream thread, changing the downstream thread does not require retrieving the upstream object, so the submitObserverWork method defined takes only two parameters, one for the downstream and one for the specified thread.

Let’s take a look at this method, which is exactly the same as above:

class Schedulers{
    fun  submitObserverWork(function: () -> Unit,thread:Int) {
        when(thread){ IO->{ IOThreadPool? .submit { function.invoke()// Call higher-order functions} } MAIN->{ handler? .let {val m=Message.obtain(it){
                        function.invoke()// Call higher-order functions
                    }
                    it.sendMessage(m)
                }
            }
        }
    }
}
Copy the code

The only difference is that function uses higher-order functions instead of interfaces.

OK, words are not much, practice, see if there is an effect is the hard truth. Here I secretly changed the name setObserver to SUBSCRIBE, after all, it looks like a bit higher ~~

MlxObservable.create(MlxObservableOnSubscribeJava<Int> {
    Log.i("zzz"."Upstream thread:${Thread.currentThread().name}")
    it.onNext(10)
})      
    .subscribeOn(Schedulers.IO())
	.observerOn(Schedulers.mainThread())
    .subscribe(object : MlxObserver<Int> {
        override fun onNext(item: Int) {
            Log.i("zzz"."Downstream thread:${Thread.currentThread().name}")}... })Copy the code

How about it!!

Did you successfully switch upstream and downstream threads? Perfect.

Can’t see it?

Above, you get the idea

Some problems with RxJava threads

So you can see that if there are multiple fake observed objects that set the upstream thread, that is, if there are multiple calls to set the upstream thread, only the first one will take effect. Others change threads, but the downstream doesn’t know about it.

Why is that?

Because if set 3 times, 1, 2, 3. Then the call order is like this: 3 will set the thread once, 2 will change it once, and 1 will change it once. So only one works.

Downstream words change several times before the last one takes effect.

But this is for cases where there are multiple downstreams, and the changes take effect several times.

What is multiple downstream? Hey, it’s doOnNext

Multiple downstreams if written this way:

.observerOn(Schedulers.mainThread())
.doOnNext {  } // a downstream
.observerOn(Schedulers.IO())
.doOnNext {  }// The second downstream
.observerOn(Schedulers.mainThread())
Copy the code

The doOnNext operator is also very simple, and you can implement it yourself. If you have no idea, you can also take a look at my implementation. I’ve only implemented a limited number of operators, and I’m running out of time.

This is the address of this article: If you are interested, you can see:

MlxRxJava

Welcome to follow me, like custom View, delve into Android technology kitten kitten ~