preface
The book picks up where it left off. The previous article analyzed the design of the Retrofit network request flow. Understand Retrofit design at all stages of code execution. This article will focus on the CallAdapter and talk about the changes in Retrofit usage with RxJava/Java8. And adapter source code for RxJava/Java8.
Ps: this article is for RxJava2 only.
RxJava2
Gradle dependencies covered in this section:
implementation 'com. Squareup. Retrofit2: retrofit: 2.9.0'
implementation 'com. Squareup. Retrofit2: adapter - rxjava2:2.9.0'
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.21'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code
The first one is RxJava2. After adding the above dependencies on RxJavaAdapter, you can add the following code when initializing Retrofit to add a callAdapter.factory for RxJava2
retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
// RxJava2CallAdapterFactory.java
public static RxJava2CallAdapterFactory create(a) {
return new RxJava2CallAdapterFactory(null.false);
}
private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
Copy the code
There are two parameters RxJava2CallAdapterFactory construction method
scheduler
You can specifysubscribeOn
Thread schedulingisAsync
The corresponding isOkHttp
theSynchronous and asynchronous execution. Ps: in this paper,Parsing is performed only for synchronization.
Thus, the network request interface can be defined as follows:
@GET("users/{user}/repos")
Observable<List<Repo>> listReposRx(@Path("user") String user);
Copy the code
RxJava2CallAdapterFactory
The us for the return type of the interface declaration for observables, will match to RxJava2CallAdapterFactory in loadServiceMethod, invoke the get method, get a RxJava2CallAdapter object.
// RxJava2CallAdapterFactory.java
public @NullableCallAdapter<? ,? > get( Type returnType, Annotation[] annotations, Retrofit retrofit) { Class<? > rawType = getRawType(returnType);if (rawType == Completable.class) {
// Completable is not parameterized (which is what the rest of this method deals with) so it
// can only be created with a single configuration.
return new RxJava2CallAdapter(
Void.class, scheduler, isAsync, false.true.false.false.false.true);
}
boolean isFlowable = rawType == Flowable.class;
boolean isSingle = rawType == Single.class;
boolean isMaybe = rawType == Maybe.class;
if(rawType ! = Observable.class && ! isFlowable && ! isSingle && ! isMaybe) {return null;
}
boolean isResult = false;
boolean isBody = false;
Type responseType;
if(! (returnTypeinstanceof ParameterizedType)) {
String name =
isFlowable ? "Flowable" : isSingle ? "Single" : isMaybe ? "Maybe" : "Observable";
throw new IllegalStateException(
name
+ " return type must be parameterized"
+ " as "
+ name
+ "<Foo> or "
+ name
+ "<? extends Foo>");
}
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); Class<? > rawObservableType = getRawType(observableType);if (rawObservableType == Response.class) {
if(! (observableTypeinstanceof ParameterizedType)) {
throw new IllegalStateException(
"Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>");
}
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
} else if (rawObservableType == Result.class) {
if(! (observableTypeinstanceof ParameterizedType)) {
throw new IllegalStateException(
"Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>");
}
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
isResult = true;
} else {
responseType = observableType;
isBody = true;
}
return new RxJava2CallAdapter(
responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
}
Copy the code
As you can see from the code, the Adapter also supports RxJava’s Completable, Flowable, Single, and Maybe.
RxJava2CallAdapter
Then take a look at the RxJava2CallAdapter#adapt method
// RxJava2CallAdapter.java
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable =
isAsync ? new CallEnqueueObservable<>(call) : newCallExecuteObservable<>(call); Observable<? > observable;if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if(scheduler ! =null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return RxJavaPlugins.onAssembly(observable);
}
Copy the code
- First of all, based on
isAsync
To create aObservable
Object,CallExecuteObservable
The packaging isOkHttp synchronization execution logic. Of course, we need moreWill encapsulate the OkHttp request logicOkHttpCall
objectThe incoming. - In accordance with the externally defined return type
Observable
As a conversion.
@GET("users/{user}/repos")
Observable<List<Repo>> listReposRx(@Path("user") String user);
Copy the code
So the interface methods defined in the beginning, the returned observables object is actually a CallExecuteObservable/CallEnqueueObservable.
CallExecuteObservable
Let’s take a look at the external network request
Observable<List<Repo>> observable = service.listReposRx("octocat");
observable.subscribe(new Consumer<List<Object>>() {
@Override
public void accept(List<Repo> objects) throws Exception {}},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {}},new Action() {
@Override
public void run(a) throws Exception {}});Copy the code
Let’s look at the logic of a CallExecuteObservable
// CallExecuteObservable.java
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallDisposable disposable = new CallDisposable(call);
observer.onSubscribe(disposable);
if (disposable.isDisposed()) {
return;
}
boolean terminated = false;
try {
Response<T> response = call.execute();
if(! disposable.isDisposed()) { observer.onNext(response); }if(! disposable.isDisposed()) { terminated =true; observer.onComplete(); }}catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if(! disposable.isDisposed()) {try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(newCompositeException(t, inner)); }}}}...Copy the code
An external call to observable.subscribe eventually leads to the CallExecuteObservable#subscribeActual method, at which point the network request is formally initiated. For those who don’t know, read my article on RxJava: What does Rx observable. create do?
- through
call.execute();
In order tosynchronousTo initiate a network request. - After the network request results are responded, the
onNext
In the callback.
At this point, the network request initiation and response is closed loop.
The thread switch issue
As mentioned in the previous article, Retrofit’s default implementation switches threads when responding. It should also be added that the need to switch threads is also determined by OkHttp’s synchronous asynchrony, as OkHttp is executed on the current thread during synchronous execution. Of course, the OkHttp asynchronous execution will first check maxRequests and maxRequestsPerHost.
For example, the current code declaration is:
observable
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<Object>>() {
@Override
public void accept(List<Object> objects) throws Exception {}},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {}},new Action() {
@Override
public void run(a) throws Exception {}});Copy the code
So when CallExecuteObservable#subscribeActual is executed, the thread is schedulers.io () and call.execute(); The schedulers.io () thread is also schedulers.io (), the same thread when onNext responds.
If you change the invocation to asynchronous, you can do so when you initially initialize Retrofit
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync())
Copy the code
When CallEnqueueObservable#subscribeActual is executed, the thread is schedulers.io () and call.enqueue() is executed. Since the network request is managed by OkHttp’s thread pool, the callback thread is a thread from OkHttp’s thread pool and also OkHttp’s thread when onNext responds.
Ps: For asynchronous execution, OkHttp sets limits on the maximum number of requests, the maximum number of requests to the same Host, and so on. Synchronization relies on the thread pool scheduling set by RxJava’s subscribeOn. This can be used according to the actual situation.
Implement a thread-switch method in response to a callback that can be declared at call time
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
Copy the code
This will convert the onNext callback to execute on the main thread.
Ps: To understand RxJava’s thread switching problem, you can read another article by me: subscribeOn and observeOn source code parsing for RxJava
A few development tips
In Restful network interface design, we generally agree on a fixed response format at the back end, such as:
{
"errCode": ""."errMessage": ""."data": {}}Copy the code
Then we can have a fixed response data entity, which I’m sure many people do
public class XResponse<T> {
private String errCode;
private String errMessage;
private T data;
public String getErrCode(a) {
return errCode;
}
public void setErrCode(String errCode) {
this.errCode = errCode;
}
public String getErrMessage(a) {
return errMessage;
}
public void setErrMessage(String errMessage) {
this.errMessage = errMessage;
}
public T getData(a) {
return data;
}
public void setData(T data) {
this.data = data; }}Copy the code
In general, we also need a common place to judge the status of back-end requests. Using RxJava to combine this entity, you can encapsulate two forms of common layer
The customObserver
abstract class HttpResultObserver<T> extends ResourceObserver<XResponse<T>> {
@Override
public void onNext(@NonNull XResponse<T> tResponse) {
if (tResponse.getErrCode().equals("OK")) {
onSuccess(tResponse.getData());
} else {
onFailed(newException(tResponse.getErrMessage())); }}@Override
public void onError(@NonNull Throwable e) {
onFailed(new Exception(e));
}
@Override
public void onComplete(a) {}abstract void onSuccess(T data);
abstract void onFailed(Exception e);
}
Observable<XResponse<List<Repo>>> observable = service.listReposRx2("abc");
Disposable disposable = observable.subscribeWith(new HttpResultObserver<List<Repo>>() {
@Override
void onSuccess(List<Object> data) {}@Override
void onFailed(Exception e) {}});Copy the code
You can define a ResourceObserver to determine the value of errCode during onNext to determine whether the network request succeeds or fails.
The customObservable
The second method is to customize Observables based on RxJava
public class HttpResultObservable<T> extends Observable<T> {
final Observable<XResponse<T>> source;
public HttpResultObservable(Observable<XResponse<T>> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
HttpResultObserver<T> parent = new HttpResultObserver<>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
static class HttpResultObserver<T> extends ResourceObserver<XResponse<T>> {
final Observer<? super T> observer;
HttpResultObserver(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(@NonNull XResponse<T> txResponse) {
if (txResponse.getErrCode().equals("OK")) {
observer.onNext(txResponse.getData());
} else {
observer.onError(newException(txResponse.getErrMessage())); }}@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete(a) {
observer.onComplete();
}
}
}
Observable<XResponse<List<Repo>>> observable = service.listReposRx2("abc");
Disposable disposable = new HttpResultObservable<List<Object>>(observable)
.subscribe(new Consumer<List<Repo>>() {
@Override
public void accept(List<Object> objects) throws Exception {}},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {}});Copy the code
Either way you can return a Disposable object, dispose() can be called in conjunction with the Activity life cycle to avoid memory leaks.
Java8
I currently use Kotlin for most of my daily Android development, and Retrofit + RxJava is currently the mainstream. But considering that Java8 is actually an older version, it’s worth knowing based on its new features. In addition, JDK11 is required in Gradle7.0. So this article will take a look at Retrofit’s CallAdapter adaptation for Java8.
The complete Java8 call network request code:
CompletableFuture<List<Repo>> completableFuture = service.listReposJava8("octocat");
try {
List<Repo> list = completableFuture.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
Copy the code
Java8 is supported by default in the latest version of Retrofit, which requires an Android SDK of 24 and above. In Retrofit. Builder# will join a CompletableFutureCallAdapterFactory build.
Ps: CompletableFuture is a new addition to Java8.
// Platform.java
static final class Android extends Platform {
Android() {
super(Build.VERSION.SDK_INT >= 24); HasJava8Types is true for 24 and above}... } List<? extends CallAdapter.Factory> defaultCallAdapterFactories(@Nullable Executor callbackExecutor) {
DefaultCallAdapterFactory executorFactory = new DefaultCallAdapterFactory(callbackExecutor);
return hasJava8Types
? asList(CompletableFutureCallAdapterFactory.INSTANCE, executorFactory)
: singletonList(executorFactory);
}
Copy the code
CompletableFutureCallAdapterFactory
// CompletableFutureCallAdapterFactory.java
public @NullableCallAdapter<? ,? > get( Type returnType, Annotation[] annotations, Retrofit retrofit) {if(getRawType(returnType) ! = CompletableFuture.class) {return null;
}
if(! (returnTypeinstanceof ParameterizedType)) {
throw new IllegalStateException(
"CompletableFuture return type must be parameterized"
+ " as CompletableFuture<Foo> or CompletableFuture<? extends Foo>");
}
Type innerType = getParameterUpperBound(0, (ParameterizedType) returnType);
if(getRawType(innerType) ! = Response.class) {// Generic type is not Response<T>. Use it for body-only adapter.
return new BodyCallAdapter<>(innerType);
}
// Generic type is Response<T>. Extract T and create the Response version of the adapter.
if(! (innerTypeinstanceof ParameterizedType)) {
throw new IllegalStateException(
"Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) innerType);
return new ResponseCallAdapter<>(responseType);
}
Copy the code
CompletableFutureCallAdapterFactory# get in, according to the return parameter return to different CallAdapter CompletableFuture and its generic type. Here we take the BodyCallAdapter as an example.
BodyCallAdapter
// CompletableFutureCallAdapterFactory.java
private static final class BodyCallAdapter<R> implements CallAdapter<R.CompletableFuture<R>> {
private final Type responseType;
BodyCallAdapter(Type responseType) {
this.responseType = responseType;
}
@Override
public Type responseType(a) {
return responseType;
}
@Override
public CompletableFuture<R> adapt(final Call<R> call) {
CompletableFuture<R> future = new CallCancelCompletableFuture<>(call);
call.enqueue(new BodyCallback(future));
return future;
}
@IgnoreJRERequirement
private class BodyCallback implements Callback<R> {
private final CompletableFuture<R> future;
public BodyCallback(CompletableFuture<R> future) {
this.future = future;
}
@Override
public void onResponse(Call<R> call, Response<R> response) {
if (response.isSuccessful()) {
future.complete(response.body());
} else {
future.completeExceptionally(newHttpException(response)); }}@Override
public void onFailure(Call<R> call, Throwable t) { future.completeExceptionally(t); }}}Copy the code
From adapt as you can see, will eventually return to a CallCancelCompletableFuture object, and the return type is, in fact, the upper call. This class only handles cancellation of network requests and is not explained here.
Call is OkHttpCall, and in this scenario, the adapt method is called and the network request (call.enqueue) is initiated.
Upon the response, the corresponding methods of CompletableFuture are called to indicate completion or failure.
Get request results
try {
List<Repo> list = completableFuture.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
Copy the code
completableFuture.get();
Methods willHas been blockedUp to the topcomplete/completeExceptionally
The callback.- When an error occurs, it is thrown
ExecutionException/InterruptedException
.
CompletableFuture (); completableFuture.get(); The call to the OkHttp thread is still in the current thread, and the internal network request is called asynchronously, so it goes to the thread in the OkHttp thread pool. The get method has a spin-like design inside it that loops until the OkHttp thread requests a response and gets the result.
The last
This article introduces the various CallAdapters for Retrofit, focusing on RxJava2/Java8 adaptation. Retrofit also has a lot of interesting designs inside ofit, and it’s highly recommended that you read the logic on your own if you have the time, which will help you with your programming ideas.