Rxjava
implementation 'the IO. Reactivex: rxjava: 1.2.0'
implementation 'the IO. Reactivex: rxandroid: 1.2.1'
Copy the code
Functional programming
Asynchronous operations
Android
- AsyncTask
- Handler
- RxJava
RxJava
This is done through the extended observer pattern
Four roles
Observable emits data by the observer
The Observer observers
Subscriber observer implements the callback
Subject Observer + observed
Subscribe
SubscribeOn () execution thread, which specifies which thread an Observable runs on. Typically, subthreads are specified, such as IO thread Schedules.io(). Kotlin is written as IO ().
ObserverOn () callback thread, data in which thread is used to specify the Observer to launch out, generally specify the main thread AndroidSchedules. MainThread ()
Scenario: Various interfaces request callback
package com.example.recycleviewdemo
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import rx.Observable
import rx.Scheduler
import rx.Subscriber
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import rx.schedulers.Schedulers.io
class MainActivity : AppCompatActivity() {
companion object {
const val TAG = "RxJava"
}
override fun onCreate(savedInstanceState: Bundle?). {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
subscriber()
}
private fun subscriber(a) {
val subscriber = object : Subscriber<String>() {
override fun onNext(t: String?). {
Log.d(TAG, "onNext$t")}override fun onCompleted(a) {
Log.d(TAG, "onCompleted")}override fun onError(e: Throwable?). {
Log.d(TAG, "onError")}override fun onStart(a) {
Log.d(TAG, "onStart")}}val observable = Observable.create(Observable.OnSubscribe<String> {
subscriber.onNext("Fage")
subscriber.onNext("HaiLiting")
subscriber.onCompleted()
}).subscribeOn(io())
.observeOn(AndroidSchedulers.mainThread())
observable.subscribe(subscriber)
}
}
Copy the code
The operator
Data transmission at a fixed interval Scenario: Interface polling
/** * interval callback */
private fun intervalTest(a) {
Observable.interval(3, TimeUnit.SECONDS)
.subscribe {
runOnUiThread {
Log.d(TAG, "Result$it")
Log.d(TAG, Thread.currentThread().name)
}
}
}
Copy the code
Observables that create sequences of integers in a specified range, used instead of For loop scenarios: operations that require a For loop
/** * integer sequence range loop * can be used instead of for loop */
private fun intervalRange(a) {
Observable.range(0.60)
.subscribe {
runOnUiThread {
Log.d(TAG, "Result$it")}}}Copy the code
FlatMap manipulates each element of a data set to output a new set
/** * collection transforms * to add something to each element */
private fun flatMap(a) {
val host = "http://www.baidu.com/"
val list = ArrayList<String>()
list.add("1")
list.add("2")
list.add("3")
list.add("4")
Observable.from(list).flatMap { t ->
Observable.just(host + t) }
.cast(String::class.java)
.subscribe{
Log.d(TAG, "Flapmap=$it")}}Copy the code
Filter Indicates the filter operation symbol
/** * Filter data */
private fun filterTest(a) {
for (bean in getData()) {
Observable.just(bean).filter { bean1 ->
bean1.age > 26 }.subscribe {
bean2 ->
Log.d(TAG, "bean=${bean2.age}")}}}Copy the code
Group and operator scenarios: Multiple interfaces emit simultaneously
Merge Non-sequential request interfaces
@Override
public void getPlaceAndWeatherData(String place) {
mMainView.showProgress();
PlaceRepository repository = new PlaceRepository();
Context context = BaseApplication.getInstance();
Observable placeObservable = repository.getPlaceList(context);
Observable weatherObservable = ServiceManager.getInstance().getApiService().getWeatherInfo(place, Constants.BAIDU_AK);
Observable.merge(placeObservable, weatherObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted(a) {
mMainView.hideProgress();
}
@Override
public void onError(Throwable e) {
mLogger.error(e.getMessage(), e);
mMainView.hideProgress();
}
@Override
public void onNext(Object obj) {
if (obj instanceof List) {
mMainView.setupPlaceData((List<Place>) obj);
} else if (obj instanceofWeatherResponse) { mMainView.setupWeatherData((WeatherResponse) obj); }}}); }Copy the code
Concat is emitted simultaneously by multiple interfaces and returned sequentially
@Override
public void getPlaceAndWeatherData(String place) {
mMainView.showProgress();
PlaceRepository repository = new PlaceRepository();
Context context = BaseApplication.getInstance();
Observable placeObservable = repository.getPlaceList(context);
Observable weatherObservable = ServiceManager.getInstance().getApiService().getWeatherInfo(place, Constants.BAIDU_AK);
Observable.concat(placeObservable, weatherObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted(a) {
mMainView.hideProgress();
}
@Override
public void onError(Throwable e) {
mLogger.error(e.getMessage(), e);
mMainView.hideProgress();
}
@Override
public void onNext(Object obj) {
if (obj instanceof List) {
mMainView.setupPlaceData((List<Place>) obj);
} else if (obj instanceofWeatherResponse) { mMainView.setupWeatherData((WeatherResponse) obj); }}}); }Copy the code
Zip merging interface scenario: This scenario applies to multiple interface requests at the same time, and the merging result is returned
/** * RxJava Zip * create multiple observers * Zip * object: Func2<T1, T2... , R> *@paramT1 the type of the first one *@paramT2 The second type *... The type of the NTH *@paramR The merged result type */
private fun zip(a) {
val observable1 = Observable.just(1.2.3)
val observable2 = Observable.just(3.4.5)
Observable.zip(observable1, observable2) {
t1, t2 -> t1 + t2
}.subscribe {
Log.d(TAG, it.toString())
Log.d(TAG, Thread.currentThread().name)
}
}
Copy the code
Common Android Scenarios
RxJava + RxAndroid + Okhttp + Retrofit for network requests
implementation 'the IO. Reactivex: rxjava: 1.2.0'
implementation 'the IO. Reactivex: rxandroid: 1.2.1'
implementation 'com. Squareup. Retrofit2: retrofit: 2.9.0'
implementation 'com. Squareup. Retrofit2: converter - gson: 2.9.0'
implementation 'com. Squareup. Retrofit2: adapter - rxjava: 2.9.0'
Copy the code
- interface
interface PlaceService {
@GET("v2/place? token=aioxxxxFsuuj&lang=zh_CN")
fun searchPlaces(@Query("query") query: String): Observable<PlaceResponse>
}
Copy the code
- Used in an Activity
- Start by writing a BasePresenter to manage RxJava subscription requests
package com.example.recycleviewdemo
import rx.Subscription
import rx.subscriptions.CompositeSubscription
open class BasePresenter {
private val compositeSubscription = CompositeSubscription()
fun addSubscription(sub: Subscription) {
compositeSubscription.add(sub)
}
fun unSubscribe(a) {
compositeSubscription.unsubscribe()
}
}
Copy the code
- Then write a BaseActivity and call the unsubscribe method in the onDestroy method
package com.example.recycleviewdemo
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import com.orhanobut.logger.AndroidLogAdapter
import com.orhanobut.logger.Logger
abstract class BaseActivity : AppCompatActivity() {
private var mPresenter: BasePresenter? = null
override fun onCreate(savedInstanceState: Bundle?). {
super.onCreate(savedInstanceState)
initPresenter()
initView()
setContentView(getContentViewId())
Logger.addLogAdapter(AndroidLogAdapter())
}
protected abstract fun initPresenter(a)
protected abstract fun initView(a)
protected abstract fun getContentViewId(a): Int
fun setPresenter(presenter: BasePresenter) {
mPresenter = presenter
}
override fun onDestroy(a) {
super.onDestroy() mPresenter? .unSubscribe() Logger.t("fage").d("Remove request")}}Copy the code
- Next, write a network request management class
package com.example.recycleviewdemo
import retrofit2.Retrofit
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
import retrofit2.converter.gson.GsonConverterFactory
import rx.Subscriber
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
class PlacePresenter: BasePresenter() {
companion object {
const val TAG = "IpPresenter"
}
fun getPlaceInfoFormation(a) {
val baseUrl = "https://api.caiyunapp.com/"
val retrofit = Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build()
val placeService = retrofit.create(PlaceService::class.java)
val subscription = placeService.searchPlaces("Hangzhou")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Subscriber<PlaceResponse>() {
override fun onNext(t: PlaceResponse) {
Logger.json(TAG, t)
}
override fun onCompleted(a) {
Logger.d(TAG,"onCompleted")}override fun onError(e: Throwable) {
Logger.d(TAG,"onError")
}
})
addSubscription(subscription)
}
}
Copy the code
- Next, MainActivity uses the Presenter setting and initiates the request
class MainActivity : BaseActivity() {
private val mPresenter: PlacePresenter by lazy {
PlacePresenter()
}
companion object {
const val TAG = "RxJava"
}
override fun getContentViewId(a): Int {
return R.layout.activity_main
}
override fun initPresenter(a) {
setPresenter(mPresenter)
mPresenter.getPlaceInfoFormation()
}
}
Copy the code
- As you can see, the request was successfully destroyed when we quit the App
Communicate across threads, activities, and fragments using RxBus
- Write an RxBus singleton
package com.example.rxjavademo.rxbus
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.SerializedSubject
import rx.subjects.Subject
/** * RxBus Kotlin version */
object RxBusKotlin {
private var subject: Subject<Any, Any>? = null
/** * RxBus is not thread safe. SerializedSubject is used to ensure thread safety
init {
subject = SerializedSubject(PublishSubject.create())
}
/** * Send event **@param object
*/
fun post(obj: Any){ subject!! .onNext(obj) }/** * Only data of the specified type will be sent *@param eventType
* @param <T>
* @return
</T> */
fun <T> toObservable(eventType: Class<T>): Observable<*> {
returnsubject!! .ofType(eventType) } }Copy the code
- Listening to the
/** * Rxbus listening */
private fun rxbus(a) {
addSubscription(RxBusKotlin.toObservable(MessageEvent::class.java)
.subscribe{
textView.text = (it as MessageEvent).name
})
}
Copy the code
- Message is sent
RxBusKotlin.post(MessageEvent("Oh, my hair has changed."))
Copy the code
- Note unsubscribe
package com.example.rxjavademo
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import com.orhanobut.logger.AndroidLogAdapter
import com.orhanobut.logger.Logger
import rx.Subscription
import rx.subscriptions.CompositeSubscription
abstract class BaseActivity : AppCompatActivity() {
private var mPresenter: BasePresenter? = null
private var mRxCompositeSubscription: CompositeSubscription? = null
override fun onCreate(savedInstanceState: Bundle?). {
super.onCreate(savedInstanceState)
mRxCompositeSubscription = CompositeSubscription()
initPresenter()
initView()
setContentView(getContentViewId())
Logger.addLogAdapter(AndroidLogAdapter())
}
protected abstract fun getContentViewId(a): Int
protected abstract fun initPresenter(a)
protected abstract fun initView(a)
fun setPresenter(presenter: BasePresenter) {
mPresenter = presenter
}
fun addSubscription(sub: Subscription){ mRxCompositeSubscription!! .add(sub) }override fun onDestroy(a) {
super.onDestroy() mPresenter? .unSubscribe() mRxCompositeSubscription? .unsubscribe() Logger.t("fage").d("Remove request")}}Copy the code