RxJava2CallAdapter creates observables asynchronously or synchronously by default
retrofit = Retrofit.Builder() .baseUrl(AppBuildConfig.getHostUrl()) .addConverterFactory(ResponseConverterFactory) .addConverterFactory(GsonConverterFactory.create(gson)) .addCallAdapterFactory(CoroutineCallAdapterFactory.invoke()) AddCallAdapterFactory (RxJava2CallAdapterFactory. The create ()) / / here. The client (providerHttpClient ()). The build ()Copy the code
This is how we usually introduce the RxJava2CallAdapter, so let’s go inside
/**
* Returns an instance which creates synchronous observables that do not operate on any scheduler
* by default.
*/
public static RxJava2CallAdapterFactory create() {
return new RxJava2CallAdapterFactory(null, false);
}
/**
* Returns an instance which creates asynchronous observables. Applying
* {@link Observable#subscribeOn} has no effect on stream types created by this factory.
*/
public static RxJava2CallAdapterFactory createAsync() {
return new RxJava2CallAdapterFactory(null, true);
}
Copy the code
You can see from the comments and code that the Create method creates observables synchronously, requiring a thread switch.
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
Copy the code
If we pass addCallAdapterFactory (RxJava2CallAdapterFactory createAsync ())
There is no need to switch to the child thread for network access, as follows
observable.observeOn(AndroidSchedulers.mainThread())
Copy the code
Just switch to the UI thread. Let’s look inside the method and see why
private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
Copy the code
That’s the constructor and now let’s look at the get method
@Override public @Nullable CallAdapter<? ,? > get( Type returnType, Annotation[] annotations, Retrofit retrofit) { ... return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); } } final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> { private final Type responseType; @Nullable private final Scheduler scheduler; private final boolean isAsync; private final boolean isResult; private final boolean isBody; private final boolean isFlowable; private final boolean isSingle; private final boolean isMaybe; private final boolean isCompletable; RxJava2CallAdapter(Type responseType, @Nullable Scheduler scheduler, boolean isAsync, boolean isResult, boolean isBody, boolean isFlowable, boolean isSingle, boolean isMaybe, boolean isCompletable) { this.responseType = responseType; this.scheduler = scheduler; this.isAsync = isAsync; this.isResult = isResult; this.isBody = isBody; this.isFlowable = isFlowable; this.isSingle = isSingle; this.isMaybe = isMaybe; this.isCompletable = isCompletable; } public Type responseType() { return this.responseType; } public Object adapt(Call<R> Call) {// The CallExecuteObservable, CallEnqueueObservable Observable<Response<R>> responseObservable = this.isAsync? new CallEnqueueObservable(call) : new CallExecuteObservable(call); Object observable; if (this.isResult) { observable = new ResultObservable((Observable)responseObservable); } else if (this.isBody) { observable = new BodyObservable((Observable)responseObservable); } else { observable = responseObservable; } if (this.scheduler ! = null) { observable = ((Observable)observable).subscribeOn(this.scheduler); } if (this.isFlowable) { return ((Observable)observable).toFlowable(BackpressureStrategy.LATEST); } else if (this.isSingle) { return ((Observable)observable).singleOrError(); } else if (this.isMaybe) { return ((Observable)observable).singleElement(); } else { return this.isCompletable ? ((Observable)observable).ignoreElements() : RxJavaPlugins.onAssembly((Observable)observable); }}}Copy the code
Take a look at the CallEnqueueObservable
final class CallEnqueueObservable<T> extends Observable<Response<T>> { private final Call<T> originalCall; CallEnqueueObservable(Call<T> originalCall) { this.originalCall = originalCall; } @Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. Call<T> call = originalCall.clone(); CallCallback<T> callback = new CallCallback<>(call, observer); observer.onSubscribe(callback); if (! Callback.isdisposed ()) {here call.enqueue(callback); }}}}Copy the code
You can see above that the call.enQueue method was actively called. Okhttp asynchronous processing is performed. So there’s no need to switch to the IO thread.
Next, CallExecuteObservable
final class CallExecuteObservable<T> extends Observable<Response<T>> { private final Call<T> originalCall; CallExecuteObservable(Call<T> originalCall) { this.originalCall = originalCall; } @Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. Call<T> call = originalCall.clone(); CallDisposable disposable = new CallDisposable(call); observer.onSubscribe(disposable); if (disposable.isDisposed()) { return; } boolean terminated = false; Execute Response<T> Response = call.execute(); if (! disposable.isDisposed()) { observer.onNext(response); } if (! disposable.isDisposed()) { terminated = true; observer.onComplete(); } } catch (Throwable t) { Exceptions.throwIfFatal(t); if (terminated) { RxJavaPlugins.onError(t); } else if (! disposable.isDisposed()) { try { observer.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); }}}}Copy the code
As you can see above, execute is called directly without thread switching, so thread switching is required in Retrofit.