Instead of talking about RXJava and Retrofit, I went straight to 2, because 2 is better packaged and more useful.

1. Observer mode

The setOnClickListener process is a subscription. After the subscription relationship is established, when the button is clicked, the listener can respond to the event.

The button here. SetOnClickListener (the listener) looks mean observed subscribed to the observer (magazine subscriptions readers), logically is not in conformity with the daily routine. This is a habit of design patterns, and it’s not a problem. Getting used to this pattern will help you understand the observer pattern.

2. Observer mode in RxJava

  • Observable(ble) words ending in ble generally mean that can… Observable, observable, observable
  • ObserverObserver (er ending words usually mean… , the… People)
  • subscribe: subscription

Observables and observers are created first, and then Observables. Subscribe (Observer) so that events emitted by an Observable are responded to by the Observer. Instead of creating an Observable manually, Retrofit returns it to us, and once we get an Observable, we just need to worry about how to manipulate the data in the Observer. However, to demonstrate this from the beginning, manually create an Observable.

2.1 create observables

Common several ways, not common will not write, because I think this module is not the point.

  • var observable=Observable.create(ObservableOnSubscribe<String> {... })
  • var observable=Observable.just(...)
  • var observable = Observable.fromIterable(mutableListOf(...) )

2.1.1 the create ()

var observable2=Observable.create(object :ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("Hello ")
                emitter.onNext("RxJava ")
                emitter.onNext("GoodBye ")
                emitter.onComplete()            }

        })
Copy the code

ObservableOnSubscribe and ObservableEmitter are strangers to each other, and if we’re talking about source code analysis in detail, it’s a lot of stuff. ObservableOnSubscribe is used to help create observables, and ObservableEmitter is used to emit events (which can be processed in response in the Observer). The Emitter fired three events at once, and then called onComplete() and we’ll talk about that when WE talk about the Observer, and we’ll talk about that as well.

2.1.2 just

var observable=Observable.just("Hello"."RxJava"."GoodBye")
Copy the code

This is the same effect as creating an Observable with Create, calling onNext 3 times and calling onComplete.

2.1.3 fromIterable

var observable = Observable.fromIterable(mutableListOf("Hello"."RxJava"."GoodBye"))
Copy the code

This is the same effect as creating an Observable with Create, calling onNext 3 times and calling onComplete.

2.2 create the Observer

val observer = object : Observer<String> {
            override fun onComplete(a) {
                Log.e("abc"."-----onComplete-----")}override fun onSubscribe(d: Disposable) {
                Log.e("abc"."-----onSubscribe-----")}override fun onNext(t: String) {
                Log.e("abc"."-----onNext-----$t")}override fun onError(e: Throwable) {
                Log.e("abc"."-----onError-----$e")}}/ / subscribe
observable.subscribe(observer)
Copy the code

Log printing:

-----onSubscribe-----
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
-----onComplete-----
Copy the code

As you can see, the subscription relationship is established and onNext is printed according to the launch order of the previous Observable. The parameters are onNext(t: In just and fromIterable cases, we don’t call the Emitter manually, but we still call onNext first and onComplete last

2.3 Consumer and Action

These two words mean consumers (which can be understood as consuming events emitted by the observer) and behavior (which can be understood as responding to the observed behavior). Not all of the four callback methods in the Observer are available, and if only a few of them are needed, then Consumer and Action are needed.

‘onSubscribe’, ‘onNext’, ‘onError’; ‘Consumer’; ‘onComplete’, ‘Action’;

2.3.1 the subscribe (Consumer <? super T> onNext)

observable.subscribe(object :Consumer<String>{
            override fun accept(t: String?). {
                Log.e("abc"."-----onNext-----$t")}})/ / print
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
Copy the code

Just to clarify, if we only pass a single object argument in subscribe, it can only be SUBSCRIBE (Consumer
onNext) (onNext method), cannot be Action or Consumer
onError, Consumer
onSubscribe

== Note that the callback method name in Consumer is ACCEPT, as opposed to onNext

2.3.2 subscribe (Consumer <? super T> onNext, Consumer<? super Throwable> onError)

With two Consumer parameters, responsible for the onNext and onError callbacks.

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?). {
                Log.e("abc"."-----onNext-----$t")}},object : Consumer<Throwable> {
            override fun accept(t: Throwable?). {
                Log.e("abc"."-----onError-----$e")}})Copy the code

If you want one with two consumers but do not subscribe(e.g. Subscribe (Consumer
onNext, Consumer
onSubscribe), ok? The answer is: no

2.3.3 the subscribe (Consumer <? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

With three parameters, responsible for the onNext, onError, and onComplete callbacks.

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?). {
                Log.e("abc"."-----onNext-----$t")}},object : Consumer<Throwable> {
            override fun accept(t: Throwable?). {
                Log.e("abc"."-----onError-----$e")}},object : Action {
            override fun run(a) {
                Log.e("abc"."-----onComplete-----")}})Copy the code

Again, the three parameters can only be combined in this way

== Note that the callback method name in Action is run, as opposed to onComplete

2.3.4 subscribe (Consumer <? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)

This has the same effect as a direct new out of the Observer.

observable2.subscribe(object : Consumer<String> {
            override fun accept(t: String?). {
                Log.e("abc"."-----onNext-----$t")}},object : Consumer<Throwable> {
            override fun accept(t: Throwable?). {
                Log.e("abc"."-----onError-----$e")}},object : Action {
            override fun run(a) {
                Log.e("abc"."-----onComplete-----")}},object : Consumer<Disposable>{
            override fun accept(t: Disposable?). {
                Log.e("abc"."-----onSubscribe-----")}})Copy the code

3. The transformation

In the above example, an Observable sends String data, so it receives String data in the Observer. Data in real life development is diverse, and sometimes the data provided by an Observable is not ideal. In this case, the conversion operator is needed. Again, we’ll just cover the usual ones:

3.1 the map

For example, if we want to convert an upstream Int to a String, we can do this:

Observable.fromIterable(mutableListOf<Int> (1.3.5.7.8))
                .map(object : Function<Int, String> {
                    override fun apply(t: Int): String {
                        return "zb$t"
                    }
                })
                .subscribe(object : Consumer<String> {
                    override fun accept(t: String?). {
                        Log.e("abc"."-- $t-")}})/ / the Log Log
-- zb1 --
-- zb3 --
-- zb5 --
-- zb7 --
-- zb8 --
Copy the code

By using the map operator, Int data becomes a String in Consumer (instead of an Observer for simple viewing of data). Function is used here. Its first generic is the data type emitted in Observable, and the second is the data type we want to transform. The data is converted manually in the Apply method of Function. Diagram: Map changes a circle into a square.

3.2 flatMap

This is similar to a map, but the flatMap returns an Observable, which means that the Function’s second generic type is fixed as an Observable. Suppose there are multiple students, each student has multiple subjects, and each subject has been tested multiple times. Now print all the scores. Map alone won’t do it. Try it

class Course(var name: String, var scores: MutableList<Int>)
class Student(var name: String, var courses: MutableList<Course>)

var stu1Course1 = Course("Sports",mutableListOf(80.81.82))
var stu1Course2 = Course("Fine arts",mutableListOf(63.62.60))
var stu1 = Student("StuA", mutableListOf(stu1Course1, stu1Course2))
var stu2Course1 = Course("Music",mutableListOf(74.77.79))
var stu2Course2 = Course("Greek",mutableListOf(90.90.91))
var stu2 = Student("StuB", mutableListOf(stu2Course1, stu2Course2))

Observable.just(stu1,stu2)
                .map(object :Function<Student,MutableList<Course>>{
                    override fun apply(t: Student): MutableList<Course> {
                        return t.courses
                    }
                })
                .subscribe(object :Consumer<MutableList<Course>>{
                    override fun accept(t: MutableList<Course>? {
                        for (item int!!) {for (i in item.scores){
                                Log.e("abc"."-- - >$i")}}}})Copy the code

You can print through a two-layer for loop, which you can’t do because you can only get the Course set in the map. The flatMap case looks like this:

Observable.just(stu1, stu2)
                .flatMap(object : Function<Student, ObservableSource<Course>> {
                    override fun apply(t: Student): ObservableSource<Course> {
                        return Observable.fromIterable(t.courses)
                    }
                })
                .flatMap(object : Function<Course, ObservableSource<Int> > {override fun apply(t: Course): ObservableSource<Int> {
                        return Observable.fromIterable(t.scores)
                    }

                })
                .subscribe(object : Consumer<Int> {
                    override fun accept(t: Int?). {
                        Log.e("abc"."-- - >$t")}})/ / the log printing
    ---> 80
    ---> 81
    ---> 82
    ---> 63
    ---> 62
    ---> 60
    ---> 74
    ---> 77
    ---> 79
    ---> 90
    ---> 90
    ---> 91
Copy the code

Using flatMap twice, the chain call is clearer than the indent. The flatMap returns an ObservableSource, not the ObservableSource we mentioned earlier. Check the Observable source code to see that it inherits ObservableSource, so this polymorphic usage is ok. An Observable.fromIterable(T.courts) returned in Apply is how we create an Observable. Simply put, map transforms the data emitted by an Observable, and flatMap re-emits every element in the collection/array of data through an Observable. Schematic: faltMap turns a series of round observables into a series of square observables.

Although the picture is ugly, I think the meaning is more clear.

3.3 the filter

Filter means to filter whether events are emitted or not by determining whether they conform to the desired logic. Only events that return true are emitted, and others are discarded. For example, if we only want to look at scores above 80, we can filter them like this:

Observable.just(stu1, stu2)
                .flatMap(object : Function<Student, ObservableSource<Course>> {
                    override fun apply(t: Student): ObservableSource<Course> {
                        return Observable.fromIterable(t.courses)
                    }
                })
                .flatMap(object : Function<Course, ObservableSource<Int> > {override fun apply(t: Course): ObservableSource<Int> {
                        return Observable.fromIterable(t.scores)
                    }

                })
                .filter(object :Predicate<Int> {override fun test(t: Int): Boolean {
                        return t > 80
                    }

                })
                .subscribe(object : Consumer<Int> {
                    override fun accept(t: Int?). {
                        Log.e("abc"."-- - >$t")}})/ / the log printing
    ---> 81
    ---> 82
    ---> 90
    ---> 90
    ---> 91
Copy the code

Notice that it’s not Function, it’s Predicate, the word “based on…” T > 80; t > 80; t > 80

4. Use with Retrofit

The previous three sections cover a lot of work to clarify the entire workflow of RxJava, without touching on thread switching. In real life development, more often than not an Observable is returned to us via Retrofit. Retrofit is a web request framework that is based on OkHttp3 with better encapsulation, which can be greatly improved when combined with RxJava. Again, we’re just looking at how to use it, not the source code interpretation.

4.1 Retrofit makes simple Get requests

implementation 'com. Squareup. Retrofit2: retrofit: 2.6.2'
implementation 'com. Squareup. Retrofit2: converter - gson: 2.6.2'
Copy the code

First we introduce dependencies, and then we request a news data from Zhihu Daily (click to view the data: news-at.zhihu.com/api/4/news/…). :

// ZhEntity
class ZhEntity {
    var date: String? = null
    var stories: MutableList<StoriesBean>? = null
    var top_stories: MutableList<TopStoriesBean>? = null

    class StoriesBean {
        var image_hue: String? = null
        var title: String? = null
        var url: String? = null
        var hint: String? = null
        var ga_prefix: String? = null
        var type: Int = 0
        var id: Int = 0
        var images: MutableList<String>? = null
    }

    class TopStoriesBean {
        var image_hue: String? = null
        var hint: String? = null
        var url: String? = null
        var image: String? = null
        var title: String? = null
        var ga_prefix: String? = null
        var type: Int = 0
        var id: Int = 0}}Copy the code
// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
    @GET("news/latest")
    fun getLatestNews(a): Call<ZhEntity>
}
Copy the code
/ / call
val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://news-at.zhihu.com/api/4/")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val call: Call<ZhEntity> = service.getLatestNews()
        call.enqueue(object : Callback<ZhEntity> {
            override fun onFailure(call: Call<ZhEntity>? , t:Throwable?). {
                Log.e("abc"."-- >$t")}override fun onResponse(call: Call<ZhEntity>? , response:Response<ZhEntity>? {
                Log.e("abc"."-- >${Gson().toJson(response? .body())}")}})Copy the code

ZhEntity is an entity class. ApiService is an interface that defines a method getLatestNews with annotations. @get is a GET request, so you can imagine @post and @get with parameters. This is the subfolder after the request address BaseUrl.

The getLatestNews function returns type Call, which Retrofit defined to request the network. The third piece of code, real object created a Retrofit, addConverterFactory (GsonConverterFactory. The create ()) is the interface returned json type of data into the type of entity class, This thing in implementation ‘com. Squareup. Retrofit2: converter – gson: 2.6.2’ was introduced.

This is followed by a series of calls to the qnQueue operation and so on. You can see that the network request can be completed without Rxjava, and the code is not complicated.

Okay, THAT’s bullshit. To continue, some people say that they do not like the URL to be cut in two, can be modified like this, the effect is exactly the same:

// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
    @GET
    fun getLatestNews(@Url url:String): Call<ZhEntity>
}
Copy the code
/ / call
val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://www.baidu.com")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val call: Call<ZhEntity> = service.getLatestNews("https://news-at.zhihu.com/api/4/news/latest")
        call.enqueue(object : Callback<ZhEntity> {
            override fun onFailure(call: Call<ZhEntity>? , t:Throwable?). {
                Log.e("abc"."-- >$t")}override fun onResponse(call: Call<ZhEntity>? , response:Response<ZhEntity>? {
                Log.e("abc"."-- >${Gson().toJson(response? .body())}")}})Copy the code

BaseUrl is still required, but it doesn’t matter if you set it to anything else because it won’t be requested.

4.2 Retrofit with RxJava

This is the end of the story. I’m sorry for my limited ability to explain complex problems in simple language. First, add a line of support for RxJava when introducing dependencies:

implementation 'com. Squareup. Retrofit2: retrofit: 2.6.2'
implementation 'com. Squareup. Retrofit2: converter - gson: 2.6.2'
implementation 'com. Squareup. Retrofit2: adapter - rxjava2:2.6.2'
Copy the code

GetLatestNews then returns an Observable!

import io.reactivex.Observable
import retrofit2.http.GET

interface ApiService {
    @GET("news/latest")
    fun getLatestNews(a): Observable<ZhEntity>
}
Copy the code

Observable makes it easy to write an Observable without reporting errors:

val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .baseUrl("https://news-at.zhihu.com/api/4/")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val observable = service.getLatestNews()
        observable.subscribeOn(Schedulers.newThread())
                .subscribe(object : Observer<ZhEntity> {
            override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: ZhEntity) {
                Log.e("abc"."-- >${Gson().toJson(t)}")}override fun onError(e: Throwable) {
                Log.e("abc"."-- >$e")}})Copy the code

Except for the change in Observable origin, nothing is different from RxJava, which was first described in this article. There is a point to subscribeOn(schedulers.newthread ()).

4.3 Thread Switchover

  • subscribeOnObservable: defines the thread from which an Observable fires events
  • observeOn: Defines the thread (map, flatMap, Observer, etc.) on which the conversion/response event is located, which can be switched multiple times

Thread switching is common. For example, sub-threads request the main thread of network data to update THE UI. Which threads can be selected for subscribeOn and observeOn? How are they used? Let’s start with an example:

Thread(object : Runnable {
            override fun run(a) {
                Log.e("abc"."Thread Current Thread:${Thread.currentThread().name}")
                observable.subscribeOn(Schedulers.newThread())
                        .doOnNext(object :Consumer<ZhEntity>{
                            override fun accept(t: ZhEntity?). {
                                Log.e("abc"."DoOnNext Current thread:${Thread.currentThread().name}")
                            }
                        })
                        .observeOn(Schedulers.io())
                        .flatMap(object :Function<ZhEntity,ObservableSource<ZhEntity.StoriesBean>>{
                            override fun apply(t: ZhEntity): ObservableSource<ZhEntity.StoriesBean> {
                                Log.e("abc"."FlatMap Current thread:${Thread.currentThread().name}")
                                return Observable.fromIterable(t.stories)
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(object : Observer<ZhEntity.StoriesBean> {
                            override fun onComplete(a){}override fun onSubscribe(d: Disposable) {
                                Log.e("abc"."OnSubscribe current thread:${Thread.currentThread().name}")}override fun onNext(t: ZhEntity.StoriesBean) {
                                Log.e("abc"."Observer current thread:${Thread.currentThread().name}")
                                Log.e("abc"."-- >${Gson().toJson(t)}")}override fun onError(e: Throwable) {
                                Log.e("abc"."-- >$e")
                            }
                        })
            }
        }).start()
/ / the log printingThread Current Thread: Thread4 -OnSubscribe Current Thread: Thread4 -Current thread: RxNewThreadScheduler- 1FlatMap Current thread: RxCachedThreadScheduler- 1Observer Current thread: Main Observer Current thread: main ObserverCopy the code

This method is called back every time onNext() is sent, so doOnNext and Subscribe (the method that emits events) are in the same thread. As can be seen from this example:

  1. Observables and observers subscribe to the current Thread (thread-4).
  2. subscribeOnDetermine the thread from which Observable fires the event (that is, the Retrofit request network thread)
  3. For the first time,observeOnDetermine the thread on which the flatMap resides (rxCachedThreadScheduler-1)
  4. Once again,observeOnDetermine the Observer thread (Android main thread main)

So each call to observeOn switches the thread and determines the thread of the next transform/response. Again, setting subscribeOn multiple times only takes effect the first time.

Thread optional values:

Name of the thread instructions
Schedulers.immediate() The default Scheduler, which runs directly on the current thread, is equivalent to not specifying a thread
Schedulers.newThread() Start a new thread and perform operations on the new thread
Schedulers.io() Scheduler used for I/O operations (reading and writing files, reading and writing databases, network information interaction, and so on). The behavior of IO () is similar to that of newThread(), except that the internal implementation of IO () uses an unlimited pool of threads and can reuse idle threads, so in most cases IO () is more efficient than newThread(). Don’t put your calculations in IO () to avoid creating unnecessary threads
Schedulers.computation() The Scheduler used for the calculation. This calculation refers to CPU intensive computing, that is, operations that do not limit performance by operations such as I/O, such as graphics computation. This Scheduler uses a fixed thread pool of CPU cores. Don’t focus on COMPUTATION (), or WAIT time for I/O operations will waste CPU
AndroidSchedulers.mainThread() Android is the main thread

4.4 the Disposable and CompositeDisposable

Override fun onSubscribe(d: override fun onSubscribe(d: override fun onSubscribe)) (Disposable) {}, an Observable calls onSubscribe when it establishes a subscription relationship with an Observer.

4.4.1 the DisPosable

The Disposable dispose() function can be used to unsubscribe so that no Observable emitted events are received:

vardis ? =null
val observable = Observable.fromIterable(mutableListOf("Hello"."RxJava"."GoodBye"))
        val observer = object : Observer<String> {
            override fun onComplete(a){}override fun onSubscribe(d: Disposable) {
                dis=d
                Log.e("abc"."-----onSubscribe-----$d")}override fun onNext(t: String) {
                if (t=="Hello") dis.dispose()
                Log.e("abc"."-----onNext-----$t")}override fun onError(e: Throwable) {
            }
        }
observable.subscribe(observer)
/ / the log printing
-----onNext-----Hello
Copy the code

As you can see, dis.dispose() does not print “RxJava” and “GoodBye” emitted upstream.

4.4.2 CompositeDisposable

CompositeDisposable can be used to manage multiple disposables. Add () to disposables Dispose () or clear() is then called inside onDestroy to dispose of all disposables to prevent memory leaks.

val cDis = CompositeDisposable()
/ /... The code is omitted
override fun onSubscribe(d: Disposable) {
                cDis.add(d)
            }
/ /... The code is omitted
override fun onDestroy(a) {
        super.onDestroy()
        cDis.clear()
    }
Copy the code

Say more, by looking at the RxJava2CallAdapterFactory. The create () source, the dispose () method can take the initiative to disconnect the connection between the observables and the Observer, also can cancel the Retrofit network request, So feel free to use it.