Rxjava

implementation 'the IO. Reactivex: rxjava: 1.2.0'
    implementation 'the IO. Reactivex: rxandroid: 1.2.1'
Copy the code

Functional programming

Asynchronous operations

Android

  • AsyncTask
  • Handler
  • RxJava

RxJava

This is done through the extended observer pattern

Four roles

Observable emits data by the observer

The Observer observers

Subscriber observer implements the callback

Subject Observer + observed

Subscribe

SubscribeOn () execution thread, which specifies which thread an Observable runs on. Typically, subthreads are specified, such as IO thread Schedules.io(). Kotlin is written as IO ().

ObserverOn () callback thread, data in which thread is used to specify the Observer to launch out, generally specify the main thread AndroidSchedules. MainThread ()

Scenario: Various interfaces request callback

package com.example.recycleviewdemo

import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import rx.Observable
import rx.Scheduler
import rx.Subscriber
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import rx.schedulers.Schedulers.io

class MainActivity : AppCompatActivity() {

    companion object {
        const val TAG = "RxJava"
    }

    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        subscriber()
    }

    private fun subscriber(a) {

        val subscriber = object : Subscriber<String>() {
            override fun onNext(t: String?). {
                Log.d(TAG, "onNext$t")}override fun onCompleted(a) {
                Log.d(TAG, "onCompleted")}override fun onError(e: Throwable?). {
                Log.d(TAG, "onError")}override fun onStart(a) {
                Log.d(TAG, "onStart")}}val observable = Observable.create(Observable.OnSubscribe<String> {
            subscriber.onNext("Fage")
            subscriber.onNext("HaiLiting")
            subscriber.onCompleted()
        }).subscribeOn(io())
                .observeOn(AndroidSchedulers.mainThread())

        observable.subscribe(subscriber)
    }
}
Copy the code

The operator

Data transmission at a fixed interval Scenario: Interface polling

/** * interval callback */
    private fun intervalTest(a) {

        Observable.interval(3, TimeUnit.SECONDS)
                .subscribe {
                    runOnUiThread {
                        Log.d(TAG, "Result$it")
                        Log.d(TAG, Thread.currentThread().name)
                    }
                }
    }
Copy the code

Observables that create sequences of integers in a specified range, used instead of For loop scenarios: operations that require a For loop

/** * integer sequence range loop * can be used instead of for loop */
    private fun intervalRange(a) {

        Observable.range(0.60)
                .subscribe {
                    runOnUiThread {
                        Log.d(TAG, "Result$it")}}}Copy the code

FlatMap manipulates each element of a data set to output a new set

/** * collection transforms * to add something to each element */
    private fun flatMap(a) {

        val host = "http://www.baidu.com/"

        val list = ArrayList<String>()
        list.add("1")
        list.add("2")
        list.add("3")
        list.add("4")

        Observable.from(list).flatMap { t ->
            Observable.just(host + t) }
                .cast(String::class.java)
                .subscribe{
                    Log.d(TAG, "Flapmap=$it")}}Copy the code

Filter Indicates the filter operation symbol

/** * Filter data */
    private fun filterTest(a) {

        for (bean in getData()) {
            Observable.just(bean).filter { bean1 ->
                bean1.age > 26 }.subscribe {
                bean2 ->
                Log.d(TAG, "bean=${bean2.age}")}}}Copy the code

Group and operator scenarios: Multiple interfaces emit simultaneously

Merge Non-sequential request interfaces

    @Override
    public void getPlaceAndWeatherData(String place) {
        mMainView.showProgress();
        PlaceRepository repository = new PlaceRepository();
        Context context = BaseApplication.getInstance();
        Observable placeObservable = repository.getPlaceList(context);
        Observable weatherObservable =  ServiceManager.getInstance().getApiService().getWeatherInfo(place, Constants.BAIDU_AK);
        Observable.merge(placeObservable, weatherObservable)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted(a) {
                        mMainView.hideProgress();
                    }

                    @Override
                    public void onError(Throwable e) {
                        mLogger.error(e.getMessage(), e);
                        mMainView.hideProgress();
                    }

                    @Override
                    public void onNext(Object obj) {
                        if (obj instanceof List) {
                            mMainView.setupPlaceData((List<Place>) obj);
                        } else if (obj instanceofWeatherResponse) { mMainView.setupWeatherData((WeatherResponse) obj); }}}); }Copy the code

Concat is emitted simultaneously by multiple interfaces and returned sequentially

    @Override
    public void getPlaceAndWeatherData(String place) {
        mMainView.showProgress();
        PlaceRepository repository = new PlaceRepository();
        Context context = BaseApplication.getInstance();
        Observable placeObservable = repository.getPlaceList(context);
        Observable weatherObservable =  ServiceManager.getInstance().getApiService().getWeatherInfo(place, Constants.BAIDU_AK);
        Observable.concat(placeObservable, weatherObservable)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted(a) {
                        mMainView.hideProgress();
                    }

                    @Override
                    public void onError(Throwable e) {
                        mLogger.error(e.getMessage(), e);
                        mMainView.hideProgress();
                    }

                    @Override
                    public void onNext(Object obj) {
                        if (obj instanceof List) {
                            mMainView.setupPlaceData((List<Place>) obj);
                        } else if (obj instanceofWeatherResponse) { mMainView.setupWeatherData((WeatherResponse) obj); }}}); }Copy the code

Zip merging interface scenario: This scenario applies to multiple interface requests at the same time, and the merging result is returned

/** * RxJava Zip * create multiple observers * Zip * object: Func2<T1, T2... , R> *@paramT1 the type of the first one *@paramT2 The second type *... The type of the NTH *@paramR The merged result type */
    private fun zip(a) {
        val observable1 = Observable.just(1.2.3)
        val observable2 = Observable.just(3.4.5)

       Observable.zip(observable1, observable2) { 
           t1, t2 -> t1 + t2 
       }.subscribe {
           Log.d(TAG, it.toString())
      	   Log.d(TAG, Thread.currentThread().name)
      }
    }
Copy the code

Common Android Scenarios

RxJava + RxAndroid + Okhttp + Retrofit for network requests

implementation 'the IO. Reactivex: rxjava: 1.2.0'
    implementation 'the IO. Reactivex: rxandroid: 1.2.1'
    implementation 'com. Squareup. Retrofit2: retrofit: 2.9.0'
    implementation 'com. Squareup. Retrofit2: converter - gson: 2.9.0'
    implementation 'com. Squareup. Retrofit2: adapter - rxjava: 2.9.0'
Copy the code
  • interface
interface PlaceService {

    @GET("v2/place? token=aioxxxxFsuuj&lang=zh_CN")
    fun searchPlaces(@Query("query") query: String): Observable<PlaceResponse>
}
Copy the code
  • Used in an Activity
  • Start by writing a BasePresenter to manage RxJava subscription requests
package com.example.recycleviewdemo

import rx.Subscription
import rx.subscriptions.CompositeSubscription

open class BasePresenter {

    private val compositeSubscription = CompositeSubscription()

    fun addSubscription(sub: Subscription) {
        compositeSubscription.add(sub)
    }

    fun unSubscribe(a) {
        compositeSubscription.unsubscribe()
    }
}
Copy the code
  • Then write a BaseActivity and call the unsubscribe method in the onDestroy method
package com.example.recycleviewdemo

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import com.orhanobut.logger.AndroidLogAdapter
import com.orhanobut.logger.Logger

abstract class BaseActivity : AppCompatActivity() {

    private var mPresenter: BasePresenter? = null

    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)

        initPresenter()
        initView()
        setContentView(getContentViewId())

        Logger.addLogAdapter(AndroidLogAdapter())
    }

    protected abstract fun initPresenter(a)
    protected abstract fun initView(a)
    protected abstract fun getContentViewId(a): Int

    fun setPresenter(presenter: BasePresenter) {
        mPresenter = presenter
    }

    override fun onDestroy(a) {
        super.onDestroy() mPresenter? .unSubscribe() Logger.t("fage").d("Remove request")}}Copy the code
  • Next, write a network request management class
package com.example.recycleviewdemo

import retrofit2.Retrofit
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
import retrofit2.converter.gson.GsonConverterFactory
import rx.Subscriber
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers

class PlacePresenter: BasePresenter() {

    companion object {
        const val TAG = "IpPresenter"
    }

    fun getPlaceInfoFormation(a) {

        val baseUrl = "https://api.caiyunapp.com/"
        val retrofit = Retrofit.Builder()
            .baseUrl(baseUrl)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build()
        val placeService = retrofit.create(PlaceService::class.java)
        val subscription = placeService.searchPlaces("Hangzhou")
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : Subscriber<PlaceResponse>() {
                override fun onNext(t: PlaceResponse) {
                    Logger.json(TAG, t)
                }

                override fun onCompleted(a) {
                    Logger.d(TAG,"onCompleted")}override fun onError(e: Throwable) {
                    Logger.d(TAG,"onError")
                }
            })
        addSubscription(subscription)
    }
}
Copy the code
  • Next, MainActivity uses the Presenter setting and initiates the request
class MainActivity : BaseActivity() {

    private val mPresenter: PlacePresenter by lazy {
        PlacePresenter()
    }

    companion object {
        const val TAG = "RxJava"
    }

    override fun getContentViewId(a): Int {
        return R.layout.activity_main
    }

    override fun initPresenter(a) {
        setPresenter(mPresenter)
        mPresenter.getPlaceInfoFormation()
    }
 }
Copy the code
  • As you can see, the request was successfully destroyed when we quit the App

Communicate across threads, activities, and fragments using RxBus

  • Write an RxBus singleton
package com.example.rxjavademo.rxbus

import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.SerializedSubject
import rx.subjects.Subject

/** * RxBus Kotlin version */
object RxBusKotlin {
    private var subject: Subject<Any, Any>? = null

    /** * RxBus is not thread safe. SerializedSubject is used to ensure thread safety
    init {
        subject = SerializedSubject(PublishSubject.create())
    }

    /** * Send event **@param object
     */
    fun post(obj: Any){ subject!! .onNext(obj) }/** * Only data of the specified type will be sent *@param eventType
     * @param <T>
     * @return
    </T> */
    fun <T> toObservable(eventType: Class<T>): Observable<*> {
        returnsubject!! .ofType(eventType) } }Copy the code
  • Listening to the
/** * Rxbus listening */
    private fun rxbus(a) {
        addSubscription(RxBusKotlin.toObservable(MessageEvent::class.java)
            .subscribe{
                textView.text = (it as MessageEvent).name
            })
    }
Copy the code
  • Message is sent
RxBusKotlin.post(MessageEvent("Oh, my hair has changed."))
Copy the code
  • Note unsubscribe
package com.example.rxjavademo

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import com.orhanobut.logger.AndroidLogAdapter
import com.orhanobut.logger.Logger
import rx.Subscription
import rx.subscriptions.CompositeSubscription

abstract class BaseActivity : AppCompatActivity() {

    private var mPresenter: BasePresenter? = null
    private var mRxCompositeSubscription: CompositeSubscription? = null

    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)

        mRxCompositeSubscription = CompositeSubscription()

        initPresenter()
        initView()
        setContentView(getContentViewId())

        Logger.addLogAdapter(AndroidLogAdapter())
    }

    protected abstract fun getContentViewId(a): Int
    protected abstract fun initPresenter(a)
    protected abstract fun initView(a)

    fun setPresenter(presenter: BasePresenter) {
        mPresenter = presenter
    }

    fun addSubscription(sub: Subscription){ mRxCompositeSubscription!! .add(sub) }override fun onDestroy(a) {
        super.onDestroy() mPresenter? .unSubscribe() mRxCompositeSubscription? .unsubscribe() Logger.t("fage").d("Remove request")}}Copy the code