RxJava and CallbackHell

Before we begin, let’s consider a question:

Do you think RxJava really works, and where does it work?

CallbackHell is not uncommon in code that doesn’t rely on RxJava + Retrofit for network requests (such as AsyncTask), I’ve had the good fortune to see and maintain various projects with 3 and 4 levels of AsyncTask callback nesting — I’ve always refused to read the AsyncTask source code, which I think is a big reason.

I would like to thank @Prototypez for his RxJava Meditation series, which I personally think is the best RxJava series in the country so far. The author lists some of the most common claims about the benefits of RxJava in most articles in the country:

  • Observer mode is used
  • Chain programming (one line of code to implement XXX)
  • Clear and concise code
  • Callback Hell is avoided

Admittedly, these are some of the best things about RxJava, but I don’t think that’s the point. As this article says, it’s more about:

RxJava brings new ideas to event-driven programming. The RxJava Observable expands our dimension into both time and space.

Event-driven programming is the right word, and now I’m reorganizing my language, “Don’t break chain calls! Don’t break the event-driven programming philosophy of RxJava.

What are you trying to say?

Back to the title of the article, error handling of network requests has always been an unavoidable requirement in Android development, and with the popularity of RxJava + Retrofit, it is inevitable to encounter this problem:

Android development in RxJava+Retrofit global network exception capture, unified processing of status code

This is a blog I summarized at the beginning of 2017. At that time, I had a limited understanding of RxJava. I read many blogs of my predecessors on the Internet and summarized the scheme in the paper, which is to put the global error processing in onError() and package Subscriber as MySubscriber:

public abstract class MySubscriber<T> extends Subscriber<T> {
&emsp;// ...
   @Override
    public void onError(Throwable e) {
         onError(ExceptionHandle.handleException(e));  // ExceptionHandle is the global processing logic
    }

    public abstract void onError(ExceptionHandle.ResponeThrowable responeThrowable);
}

api.requestHttp()  // Network request
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new MySubscriber<Model>(context) {        // MySubscriber that wraps the global error processing logic
          @Override
          public void onNext(Model model) { // ... }

          @Override
           public void onError(ExceptionHandle.ResponeThrowable throwable) {
                 / /...}});Copy the code

This solution seemed fine to me at the time, and I thought it was the perfect solution.

Another problem I soon realized was that this scheme successfully drove me to write the RxJava version of Callback Hell.

Callback Hell for the RxJava version

I don’t want you to laugh at my code, so I decided not to throw it out and look at a common requirement:

Request an API, and if an exception occurs, a Dialog pops up asking the user whether to retry, and if retry, request the API again.

Let’s take a look at some of the code that most developers would probably write first (I’m using Kotlin here to make the code less verbose) :

api.requestHttp()
    .subscribe(
          onNext = {
                // ...
          },
          onError = {
                  AlertDialog.Builder(context)    // A dialog pops up asking the user whether to retry
                    .xxxx
                    .setPositiveButton("Try again") {_, _ ->// Click the retry button to request again
                              api.requestHttp()                  
                                 .subscribe(
                                       onNext = {   ...   },
                                       onError = {    ...   }
                                  )
                    }
                    .setNegativeButton("Cancel") {_, _ ->// do nothing}
                    .show()
          }
    )
Copy the code

Look! What have we written!

Now you can probably see where I’m at. OnError () and onComplete() mean that the subscription event is aborted. If the global exception handling is in onError(), then if there are other requirements (such as network requests), that means you need to add another layer of callbacks to the callback method.

While Shouting that RxJava chain calls were simple and avoided CallbackHell, we threw reactive programming aside and continued to write code that was the same as everyday thinking.

If you feel this operation is completely acceptable, we can upgrade the requirements:

If an exception occurs, a dialog pops up up to three times to prompt the user to try again.

Ok, if, at most, one retry adds an additional layer of nesting of callbacks to the code (actually two, and the Dialog click event itself is a layer of callbacks), then the maximum three retries is….. Layer 4 callback:

api.requestHttp()
    .subscribe(
          onNext = {
                // ...
          },
          onError = {
                      api.requestHttp()                  
                         .subscribe(
                               onNext = {
                                     // ...
                                },
                               onError = {                         
                                      api.requestHttp()                  
                                        .subscribe(
                                              onNext = {   ...   },
                                              onError = {    ...   }     // One more layer)})})Copy the code

You can say, I wrapped this request into a function and just called it each time, but you can’t deny that CallbackHell isn’t elegant.

Now, if there was an elegant solution, what would be the best advantages of that solution?

What I would like it to do, if possible:

1. The lightweight

Lightweight means lower dependency costs. If a tool library relies on several third-party libraries, the rapid expansion of APK volume is unacceptable in the first place.

2. The flexible

Flexibility means lower migration costs, and I don’t want to drastically change or even refactor my entire project by adding or removing this tool.

If possible, do not make changes to existing business logic code.

3. Low learning costs

The low learning cost allows developers to get started with the tool faster.

4. Highly extensible

If possible, let this library do whatever it wants.

So there are some limitations to the inheritance of the global error solution above, and aside from the jaw-dropping hell of callbacks, not being able to use lambda expressions is intolerable.

RxWeaver: a lightweight and flexible global Error handling middleware

I spent some time open-source this tool:

RxWeaver: A lightweight and flexible error handler tools for RxJava2.

Weaver is called Weaver, and my initial goal is to make this tool to be able to organize the logical code correctly, to achieve RxJava global Error handling requirements.

How does it work? How far can you go?

For brevity, I chose to use Kotlin as the demo code, which I’m sure you can read and understand – if Java is the development language for your project, don’t worry, RxWeaver also provides Java version dependencies and sample code, which you can find here.

The configuration of RxWeaver is very simple. All you need to do is configure the corresponding GlobalErrorTransformer class and use the compose() operator in your network request code to handle errors. To hand the GlobalErrorTransformer to RxJava, note that only one line of code is required:

private fun requestHttp(a) {
        serviceManager.requestHttp()     // Network request
                .compose(RxUtils.handleGlobalError<UserInfo>(this))    // Add this line
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe( / /...).
}
Copy the code

RxUtils. HandleGlobalError < the UserInfo > (this) a similar Java method of static tools, it will return a corresponding GlobalErrorTransformer an instance – is stored inside the corresponding error handling logic, This class is not part of RxWeaver, but is implemented by itself, depending on the business of different projects:

object RxUtils {

    fun  handleGlobalError(activity: FragmentActivity): GlobalErrorTransformer {
          / /...}}Copy the code

Now what we need to know is how far we can go with this line of code.

Let’s look at the toughness of the tool from three different gradients of requirements:

1. When an Error is received, the Toast information is displayed to the user

This is the most common requirement. When a special exception occurs (in this case, JSONException is taken as an example), we will prompt the following message to the user through Toast:

Global exception capture -Json parse exception!

fun test(a) {
        Observable.error(JSONException("JSONException"))
                .compose(RxUtils.handleGlobalError<UserInfo>(this))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // ...}}Copy the code

There is no doubt that when there is no add compose (RxUtils handleGlobalError < the UserInfo > (this)) this line of code, the result of the subscription is necessarily pop up a “onError: XXXX” toast.

Now that we have added the compose line, let’s wait and see:

It looks like success, even though we did a separate handle for Exception inside onError(), the JSONException is still caught globally, and an extra toast pops up: “Global Exception capture -Json parse Exception!” .

This seems like a simple requirement, so let’s make it a little harder:

2. Pop-up Dialog when some Error is received

The requirements this time are:

If we receive a ConnectException, we make a dialog pop up, which will pop up only once, and if the user chooses to retry, request the API again

Getting back to the need to invoke Callback Hell, how do you ensure that the Dialog and retry logic executes correctly without breaking the continuity of the Observable stream?

fun test2(a) {
  Observable.error(ConnectException())        // This time we replace the exception with 'ConnectException'
                .compose(RxUtils.handleGlobalError<UserInfo>(this))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // ...}}Copy the code

Again familiar code, this time we have replaced the exception with a ConnectException, and we can look directly at the result:

Because our data source is a fixed ConnectException, no matter how hard we try it, we are bound to only receive a ConnectException. It doesn’t matter if you notice that there is no, even if it is a complex requirement (popup dialog, after the user has selected, decide whether to request the stream again). RxWeaver can still do the job.

Last case, let’s do a more complicated one.

3. When receiving the Error message indicating that the Token is invalid, the login page is displayed.

The detailed requirements are:

If an Error message indicating that the Token is invalid is received, the login page is displayed. After a successful login, the user returns to the initial page and requests the API again. If the user login fails or cancels, an error message is displayed.

Obviously this logic is a little complicated, for the realization of this demand, it seems not realistic, whether this time will do nothing?

fun test3(a) {
    Observable.error(TokenExpiredException())
                .compose(RxUtils.handleGlobalError<UserInfo>(this))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                subscribe {
                    // ...}}Copy the code

This time we replaced the exception with TokenExpiredException(since instantiating an HttpException directly is too complicated, we’ll use a custom exception simulation instead) and we’ll look directly at the result:

Of course, the data source will always only fire TokenExpiredException no matter how many retries are made, but we managed to implement this seemingly complex requirement.

4. What am I trying to explain?

I think RxWeaver meets my design requirements:

  • lightweight

You don’t need to worry about the size of RxWeaver, it’s lightweight enough that all classes add up to less than 200 lines of code, and it doesn’t have any dependencies other than RxJava and RxAndroid, with a size of only 3KB.

  • flexible

The configuration of RxWeaver does not require modifying or removing a single line of existing business code — it is fully pluggable.

  • Low learning cost

It can also be used with onErrorResumeNext, retryWhen, and doOnError.

  • High scalability

Arbitrarily complex requirements can be implemented through interfaces.

The principle of

This seems to be putting the cart before the horse, as proficiency with the API is often a higher priority for a tool than reading the source code and understanding how it works. But in my opinion, if you understand the principle first, you will be much more comfortable with the tool.

RxWeaverIs the principle complex?

In fact, the source code of RxWeaver is very simple, simple to the internal components without any Error processing logic, all logic to the user for configuration, it is just a middleware.

It can also be used with onErrorResumeNext, retryWhen, and doOnError.

1. com pose operators

To handle the global exception, I just add a line of code to the chain call of the existing code, Configure a GlobalErrorTransformer < T > to compose () operator – this operator is RxJava can provide us the responsive oriented data types (observables/Flowable/Single, etc.) for AOP Can be manipulated, modified, or even replaced by reactive data types.

This means that, using the compose() operator, I can easily plug a special piece of logic into the Observable on top of the existing code.

Avoid breaking the compose chain: use the. Compose () operator @by

Compose () operator need me into a corresponding reactive type (observables/Flowable/Single, etc.) of Transformer interface, but the problem is that different response types Corresponding to different Transformer interface, So we implement a common GlobalErrorTransformer

interface to accommodate different reactive types of event flows:

class GlobalErrorTransformer<T> constructor(
        private val globalOnNextRetryInterceptor: (T) -> Observable<T> = { Observable.just(it) },
        private val globalOnErrorResume: (Throwable) -> Observable<T> = { Observable.error(it) },
        private val retryConfigProvider: (Throwable) -> RetryConfig = { RetryConfig() },
        private val globalDoOnErrorConsumer: (Throwable) -> Unit= {},private val upStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() },
        private val downStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() }
) : ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>,  MaybeTransformer<T, T>, CompletableTransformer {
      // ...
}
Copy the code

Now let’s think about it. If we want to put the error handling logic inside the GlobalErrorTransformer, pass the GlobalErrorTransformer to the compose() operator, Inserts all the error handling logic into the existing Observable event stream:

fun test(a) {
    observable
          .compose(RxUtils.handleGlobalError<UserInfo>(this))   // Insert exception handling logic
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          subscribe {
              // ...}}Copy the code

Similarly, if an API does not need to append global exception handling logic, you can simply delete this line of code without affecting the rest of the business code.

That’s a good idea, but the next thing we need to think about is, how do we add different exception handling logic to GlobalErrorTransformer?

2. Simple global exception handling: doOnError operator

The purpose of this operator is pretty obvious, which is the logic we want to do when we receive a Throwable:

This is really suitable for most simple error handling requirements, like requirement 1 above, when we receive a specified exception, the corresponding message will prompt the user, the logical code is as follows:

when (error) {
    is JSONException -> {
        Toast.makeText(activity, "Global exception capture -Json parse exception!", Toast.LENGTH_SHORT).show()
    }
    else- > {}}Copy the code

That is, JSONException still ends up in a callback to subscribe onError() — you still need to implement the onError() callback, even if you don’t do anything. Do special processing if necessary, otherwise a crash will occur.

This is simple, but with complex requirements it can’t be helped, so we need to use the onErrorResumeNext operator.

3. Complex asynchronous Error handling: onErrorResumeNext operator

For example, in requirement 2 above, if we receive a specified exception, we need to display a Dialog that prompts the user to retry — in this case, the doOnError operator is obviously powerless because it does not have the ability to transform an Observable.

This is where onErrorResumeNext comes in. When an error occurs in the stream’s event delivery, we can hand in a new stream with onErrorResumeNext to ensure that the stream can continue.

This is a very underrated operator, which means that you can pass events down as long as you give an Observable

. Does this have anything to do with the requirement to present a Dialog for the user to select?

We just need to convert the Dialog’s event to the corresponding Observable:

object RxDialog {

    /** * a simple example where a dialog pops up prompting the user to convert the user's actions into a stream and returns */
    fun showErrorDialog(context: Context,
                        message: String): Single<Boolean> {

        return Single.create<Boolean> { emitter ->
            AlertDialog.Builder(context)
                    .setTitle("Error")
                    .setMessage("You received an exception:$messageWould you like to retry this request?)
                    .setCancelable(false)
                    .setPositiveButton("Try again") { _, _ -> emitter.onSuccess(true) }
                    .setNegativeButton("Cancel") { _, _ -> emitter.onSuccess(false) }
                    .show()
        }
    }
}
Copy the code

RxDialog’s showErrorDialog() function will display a Dialog that returns a Single

stream. When the user clicks OK, the subscriber will receive a true event, or if the user clicks cancel, A false event is received.

Can RxJava still be used this way?

Of course, RxJava represents a responsive programming paradigm, and we’ve all heard it said in our early days that one of RxJava’s greatest strengths is asynchracy.

Now, the flow of data in a network request represents asynchrony. Isn’t a dialog popping up and waiting for the user to select the result also asynchrony?

In other words, an event in the flow of a network request means the result of the network request, so the Single

above means that the event in the flow is the click event of a ** Dialog.

In fact, over the years of RxJava’s development, RxJava extension libraries on Github have emerged one after another, such as RxPermission and RxBinding. The former delivers the result of a permission request as an event to an Observable for transmission. The latter hands the View’s events to the Observable (click, hold, etc.).

Back to RxDialog, now that we have created a responsive Dialog via RxDialog and obtained the user’s selection Single

, all we need to do is rerequest network data based on the value of the Single

event.

4. Retry handling: retryWhen operator

RxJava provides the retryWhen() operator, which gives us the task of deciding whether to re-subscribe to the stream (in this case, to re-subscribe to the network request) :

I won’t go into too much detail about the retryWhen() operator because of space constraints. For the retryWhen() operator, see:

RepeatWhen () and retryWhen() operators in RxJava

Continuing with the previous thread, we go to the Single

stream corresponding to the Dialog. When the user selects, it instantiates a RetryConfig object and passes the result of the selection to the condition property:

RetryConfig(condition = RxDialog.showErrorDialog(params))

data class RetryConfig(
        val maxRetries: Int = DEFAULT_RETRY_TIMES,  // Maximum number of retries. Default: 1
        val delay: Int = DEFAULT_DELAY_DURATION,    // retry delay, 1000ms by default
        val condition: () -> Single<Boolean> = { Single.just(false)}// Whether to retry
)
Copy the code

Now let’s rearrange our thinking:

1. When the user receives a specified exception, a Dialog pops up with the result Single

; 2.RetryConfig stores a Single

property internally, which is a function that determines whether or not to retry; 3. When the user selects the confirm button, pass Single(True) and instantiate a RetryConfig, which means retries, or cancel, which means no retries.

5. Seems to be… Finished?

So, with just a few operators, Error can handle complex requirements and we can do that, right?

Indeed, in fact, the processing within GlobalErrorTransformer is exactly what calls these operators:

class GlobalErrorTransformer<T> constructor(
        private val globalOnNextRetryInterceptor: (T) -> Observable<T> = { Observable.just(it) },
        private val globalOnErrorResume: (Throwable) -> Observable<T> = { Observable.error(it) },
        private val retryConfigProvider: (Throwable) -> RetryConfig = { RetryConfig() },
        private val globalDoOnErrorConsumer: (Throwable) -> Unit= {},private val upStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() },
        private val downStreamSchedulerProvider: () -> Scheduler = { AndroidSchedulers.mainThread() }
) : ObservableTransformer<T, T>,
        FlowableTransformer<T, T>,
        SingleTransformer<T, T>,
        MaybeTransformer<T, T>,
        CompletableTransformer {

    override fun apply(upstream: Observable<T>): Observable<T> =
            upstream
                    .flatMap {
                        globalOnNextRetryInterceptor(it)
                    }
                    .onErrorResumeNext { throwable: Throwable ->
                        globalOnErrorResume(throwable)
                    }
                    .observeOn(upStreamSchedulerProvider())
                    .retryWhen(ObservableRetryDelay(retryConfigProvider))
                    .doOnError(globalDoOnErrorConsumer)
                    .observeOn(downStreamSchedulerProvider())

    // Same for other responsive types...
}                    
Copy the code

This is why RxWeaver is so lightweight. Even the core class GlobalErrorTransformer has no more complex logic, just a combination of several operators.

Several other classes are nothing more than encapsulation of the retry logic interface.

6. How to switch to the interface?

See here, some friends may have such a question:

In requirement 2, I can understand the logic of Dialog. Then, in requirement 3, how to realize the invalid Token, jump login and return retry?

In fact, whether it is a network request, a pop-up Dialog, or a jump Login, it is just a stream of events after all. The former can return an Observble

or Single

through the interface, and the jump Login can also:

class NavigatorFragment : Fragment() {

    fun startLoginForResult(activity: FragmentActivity): Single<Boolean> {
        / /...}}Copy the code

Limited space, this article does not carry out the implementation of the code display, the source code please refer to here.

This works exactly like RxPermissions, RxLifecycle, and my RxImagePicker, which relies on an invisible Fragment to pass the data.

Summary: RxJava, complex or simple

At the beginning of this article, I briefly described several advantages of RxWeaver, one of which is the extremely low cost of learning.

Before this article was published, I introduced my tool to some developers new to RxJava, and their feedback was surprisingly consistent:

You are too difficult!

I was surprised by this result, because it was only a tool library with less than 200 lines in total. After thinking it over, I finally came to a conclusion:

The content of this article is easy to understand, but you need to have some understanding of RxJava first, which is difficult.

RxJava has a steep learning curve! As @Prototypez put it in his post:

RxJava is a “bootloaded” framework, and its most important contribution is to raise the dimension of how we think about event-driven programming, but it forces us to embrace functional programming at the same time.

As this article said at the beginning, we have become accustomed to procedural programming thinking, so in this paper, some abstract operators will make us into a certain amount of confusion, but this is what RxJava magic – it makes me want to new requirements From a higher level of abstraction, try to write more concise code (at least in my opinion).

I like RxWeaver very much. Some of my friends say that it has a little bit less code, but I think its biggest advantage is its lightness. Its essential purpose is to help developers organize their business logic so that they can write more Reactive and Functional code.

— — — — — — — — — — — — — — — — — — — — — — — — — – advertising line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — –

About me

If you think this article is valuable to you, welcome to ❤️, and also welcome to follow my blog or Github.

If you feel that the article is not enough, please also urge me to write a better article by paying attention to it — in case I make progress one day?

  • My Android learning system
  • About the article error correction
  • About paying for knowledge