Introduce a,

RxLifecycle purpose: To resolve memory leaks in RxJava usage.

For example, when you subscribe to and execute time-consuming tasks using RxJava, if the Activity is not unsubscribed when the time-consuming task is finished, the Activity cannot be reclaimed, resulting in a memory leak.

To solve this problem, RxLifecycle was created to make RxJava lifecycle aware, allowing it to cancel subscriptions in time to avoid memory leaks.

Second, the use of

First, let’s look at the use of RxLifecycle.

1. Add dependencies

  implementation 'com. Trello. Rxlifecycle2: rxlifecycle: 2.2.1'
  implementation 'com. Trello. Rxlifecycle2: rxlifecycle - android: 2.2.1'
  implementation 'com. Trello. Rxlifecycle2: rxlifecycle - components: 2.2.1'
Copy the code

2. Inherit container classes

The Activity/fragments need inherit RxAppCompatActivity RxFragment, mainly several container class supports the following:

Rx

3. Bind the container lifecycle

Take the Activity as an example, there are two main methods:

bindUntilEvent(@NonNull ActivityEvent event)
Copy the code
bindToLifecycle()
Copy the code

There are the same two methods for fragments, but the names are different.

The following details the differences between the two methods:

bindUntilEvent

This method specifies which lifecycle method to unsubscribe when called.

ActivityEvent is an enumerated class that corresponds to the Activity’s life cycle.

public enum ActivityEvent {

    CREATE,
    START,
    RESUME,
    PAUSE,
    STOP,
    DESTROY

}
Copy the code

Specific usage examples:

override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        Observable.interval(1, TimeUnit.SECONDS)
                .doOnDispose {
                    Log.i(TAG, "Unsubscribing subscription from onDestory()")
                }
                .compose(bindUntilEvent(ActivityEvent.DESTROY))
                .subscribe {
                    Log.i(TAG, "Started in onCreate(), running until in onDestroy(): $it")}}Copy the code

Specifies that the subscription is unsubscribed during the onDestory() lifecycle.

bindToLifecycle

Bind at a certain life cycle and unsubscribe at the corresponding life cycle.

Specific usage examples:

override fun onResume() {
        super.onResume()
        Observable.interval(1, TimeUnit.SECONDS)
                .doOnDispose {
                    Log.i(TAG, "Unsubscribing subscription from onPause()")
                }
                .compose(bindToLifecycle())
                .subscribe {
                    Log.i(TAG, "Started in onResume(), running until in onPause(): $it")}}Copy the code

Unsubscribe from onResume() and unsubscribe from onPause(). The lifecycle is pin-pout.

Three, principle analysis

1.compose

Let’s start with the compose operator.

compose(bindToLifecycle())
compose(bindUntilEvent(ActivityEvent.DESTROY))
Copy the code

As shown above, both ways of binding the life cycle are implemented through the compose operator.

Compose can generally be used in conjunction with Transformer to convert one type of Observable into another, ensuring the chain structure of the call.

So let’s look at the use of this operator in RxLifecycle, starting with bindToLifecycle and bindUntilEvent.

2.BehaviorSubject

public abstract class RxAppCompatActivity extends AppCompatActivity implements LifecycleProvider<ActivityEvent> {

    private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();

    @Override
    @NonNull
    @CheckResult
    public final Observable<ActivityEvent> lifecycle() {
        return lifecycleSubject.hide();
    }

    @Override
    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
    }

    @Override
    @NonNull
    @CheckResult
    public final <T> LifecycleTransformer<T> bindToLifecycle() {
        return RxLifecycleAndroid.bindActivity(lifecycleSubject);
    }

    @Override
    @CallSuper
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifecycleSubject.onNext(ActivityEvent.CREATE);
    }

    @Override
    @CallSuper
    protected void onStart() {
        super.onStart();
        lifecycleSubject.onNext(ActivityEvent.START);
    }

    @Override
    @CallSuper
    protected void onResume() {
        super.onResume();
        lifecycleSubject.onNext(ActivityEvent.RESUME);
    }

    @Override
    @CallSuper
    protected void onPause() {
        lifecycleSubject.onNext(ActivityEvent.PAUSE);
        super.onPause();
    }

    @Override
    @CallSuper
    protected void onStop() {
        lifecycleSubject.onNext(ActivityEvent.STOP);
        super.onStop();
    }

    @Override
    @CallSuper
    protected void onDestroy() { lifecycleSubject.onNext(ActivityEvent.DESTROY); super.onDestroy(); }}Copy the code

There is a key object BehaviorSubject in the RxAppActivity

The BehaviorSubject sends the last value closest to the subscription, or the default value if there is no previous value. The diagram below:

So lifecycleSubject keeps sending subsequent lifecycle events, ActivityEvent, based on the period in which the subscription is bound.

3.LifecycleTransformer

BindToLifecycle and bindUntilEvent both return a LifecycleTransformer object, so LifecycleTransformer exactly what is used?

@ParametersAreNonnullByDefault public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>, MaybeTransformer<T, T>, CompletableTransformer { final Observable<? > observable; LifecycleTransformer(Observable<? > observable) { checkNotNull(observable,"observable == null");
        this.observable = observable;
    }

    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return upstream.takeUntil(observable);
    }

    @Override
    public Publisher<T> apply(Flowable<T> upstream) {
        return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
    }

    @Override
    public SingleSource<T> apply(Single<T> upstream) {
        return upstream.takeUntil(observable.firstOrError());
    }

    @Override
    public MaybeSource<T> apply(Maybe<T> upstream) {
        return upstream.takeUntil(observable.firstElement());
    }

    @Override
    public CompletableSource apply(Completable upstream) {
        return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) { return true; }
        if(o == null || getClass() ! = o.getClass()) {return false; } LifecycleTransformer<? > that = (LifecycleTransformer<? >) o;return observable.equals(that.observable);
    }

    @Override
    public int hashCode() {
        return observable.hashCode();
    }

    @Override
    public String toString() {
        return "LifecycleTransformer{" +
            "observable=" + observable +
            '} '; }}Copy the code

LifecycleTransformer implements various Transformer interfaces, Can a observables/Flowable/use/Completable/Maybe object into another observables/Flowable/use/Completable/Maybe object. This works nicely with the compose operator above and is used in the chain call.

4.takeUntil

Now comes the key: What does LifecycleTransformer convert the original Observable into?

That’s where the takeUntil operator comes in!

Observable
Observable
Observable
takeUntil
Observable

Now that you understand what this operator does, RxLifecycle unsubscribes by listening to the data emitted by the second Observable.

So who is this second Observable?

It is passed into the constructor when the LifecycleTransformer is created, so find out when the object was created.

To start from the beginning:

public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
    }
Copy the code

This method returns the LifecycleTransformer object and continues tracing back.

public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                            @Nonnull final R event) {
    checkNotNull(lifecycle, "lifecycle == null");
    checkNotNull(event, "event == null");
    return bind(takeUntilEvent(lifecycle, event));
}

private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
    return lifecycle.filter(new Predicate<R>() {
        @Override
        public boolean test(R lifecycleEvent) throws Exception {
            returnlifecycleEvent.equals(event); }}); }Copy the code

Keep tracking. We’re getting closer.

public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
    return new LifecycleTransformer<>(lifecycle);
}
Copy the code

The method creates this object and passes in an Observable object, which is a BehaviorSubject object.

So when does this object send its first data?

This depends on the takeUntilEvent method above.

Lifecycleevent.equals (event). The BehaviorSubject sends first data only if the ActivityEvent value of the BehaviorSubject is equal to the unbound lifecycle. Then when the first data is sent, the subscription is unbound according to the analysis above.

So for bindToLifecycle method, what is the operation, so that in the corresponding life cycle to unsubscribe?

Continue to look at the source code.

public final <T> LifecycleTransformer<T> bindToLifecycle() {
        return RxLifecycleAndroid.bindActivity(lifecycleSubject);
    }
Copy the code
public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
    return bind(lifecycle, ACTIVITY_LIFECYCLE);
}
Copy the code

ACTIVITY_LIFECYCLE is:

private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
    new Function<ActivityEvent, ActivityEvent>() {
        @Override
        public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
            switch (lastEvent) {
                case CREATE:
                    return ActivityEvent.DESTROY;
                case START:
                    return ActivityEvent.STOP;
                case RESUME:
                    return ActivityEvent.PAUSE;
                case PAUSE:
                    return ActivityEvent.STOP;
                case STOP:
                    return ActivityEvent.DESTROY;
                case DESTROY:
                    throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
                default:
                    throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented"); }}};Copy the code

This function returns a lifecycle based on the incoming lifecycle event, for example, CREATE→DESTROY. It seems that this function can achieve the corresponding life cycle unbinding.

But also need a series of operators to assist, continue to look at the source.

public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                                                      @Nonnull final Function<R, R> correspondingEvents) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(correspondingEvents, "correspondingEvents == null");
        return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
    }

    private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                       final Function<R, R> correspondingEvents) {
        return Observable.combineLatest(
            lifecycle.take(1).map(correspondingEvents),
            lifecycle.skip(1),
            new BiFunction<R, R, Boolean>() {
                @Override
                public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(bindUntilEvent);
                }
            })
            .onErrorReturn(Functions.RESUME_FUNCTION)
            .filter(Functions.SHOULD_COMPLETE);
    }
Copy the code

See takeUntilCorrespondingEvent method in detail.

5.take

First look at the take operator, which is simple.

Take (int) takes an integer n as an argument and emits only the first n items, as shown below:

By the way, lifecycle. Take (1).map(correspondingEvents), that is, we got the first lifecycle event sent and converted it to the response lifecycle by the corresponding function above. If you bind in onCreate, the CREATE is sent first and the corresponding DESTORY is returned.

6.skip

Skip (int) ignores the first n items emitted by an Observable

Lifecycle. Skip (1), if bound in onCreate then the rest is START, RESUME, PAUSE, STOP, DESTROY

7. combineLatest

Finally, a key operator, combineLatest, is required to perform the corresponding lifecycle unsubscribe.

The combineLatest operator can assemble and emit data emitted by 2 to 9 Observables. But there are two things:

  • All Observables emit data.
  • When any of the above conditions are metObservableSend one data, it will allObservableThe latest transmitted data is assembled and transmitted according to the functions provided.

A specific example is shown in the figure below:

By the third parameter function, lifecycle. Take (1).map(correspondingEvents) and lifecycle. Skip (1) and proceed to combine

new BiFunction<R, R, Boolean>() {
                @Override
                public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(bindUntilEvent); }}Copy the code

So the result is

false.false.false.false.true
Copy the code

OnErrorReturn and filter follow to handle exceptions and determine if the subscription should end:

Static final Function<Throwable, Boolean> static final Function<Throwable, Boolean> Boolean>() { @Override public Boolean apply(Throwable throwable) throws Exception {if (throwable instanceof OutsideLifecycleException) {
                return true;
            }

            //noinspection ThrowableResultOfMethodCallIgnored
            Exceptions.propagate(throwable);
            return false; }}; // Should I unsubscribe, Static final Predicate<Boolean> SHOULD_COMPLETE = new Predicate<Boolean>() {@override public Booleantest(Boolean shouldComplete) throws Exception {
            returnshouldComplete; }};Copy the code

So, following the example above, if the binding is done in onCreate(), then the corresponding unsubscribe is done in onDestory().

Four,

From the above analysis, you can understand the use of RxLifecycle and how it works.

As we studied RxLifecycle, we learned more about the use of the Observer pattern and the power of the RxJava operators, which help us to transform columns.