CSDN address of this paper: CSDN began to contact RxJava last year, deeply attracted by the design of RxJava, after a rough reading of the source code of RxJava1, feeling deeply, the project is switched to RxJava2 at the beginning of this year, RxJava this kind of functional programming is more interested in, but the focus of this article is not on the source code analysis. RxJava itself is an Observer mode, subscribing to observables via the Observables subscribe method. Let’s first look at the Create method in Observable:

     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source."source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }Copy the code

We’ll start with the ObservableOnSubscribe parameter, which is an interface with a simple internal implementation:

    public interface ObservableOnSubscribe<T> {

        /**
         * Called for each Observer that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
    }Copy the code

ObservableEmitter is an interface class, and ObservableEmitter is an ObservableEmitter class. ObservableEmitter is an interface class, and ObservableEmitter is an ObservableEmitter class.

    public interface Emitter<T> {
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(@NonNull T value);

        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(@NonNull Throwable error);

        /**
         * Signal a completion.
         */
        void onComplete();
    }Copy the code

ObservableCreate: ObservableCreate: ObservableCreate: ObservableCreate: ObservableCreate

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;

        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

We know that an Observable internally calls an abstract method subscribeActual when it subscribes. Here we see clearly observer.onSubscribe(parent) and source. Subscribe(parent). The latter is the method that implements the ObservableOnSubscribe interface, and when we call SUBSCRIBE, we call both methods, so let’s go back to CreateEmitter, This class internally implements two ObservableEmitter and Disposable interfaces.

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return; } i f (! isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) {if(! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) {if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if(! isDisposed()) { try { observer.onError(t); } finally { dispose(); }return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if(! isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public voidsetDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }


        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }Copy the code

When we call the parent interface methods inside source.subscribe(parent), we end up calling the observer methods. At this point, it’s not hard to see that rXJava-style encapsulation is a breeze if we want to wrap third-party apis. Today we first try to package the Api of Amap, first here is the demo address –> Github first is the official positioning demo, please look here –> AMapLocation object we finally need to call through the demo analysis, AMapLocation object, The AMapLocation object is called in the interface AMapLocationListener, so we define an Observable as follows:

public class LocationObservable extends Observable<AMapLocation> { private final AMapLocationClient aMapLocationClient; public LocationObservable(AMapLocationClient aMapLocationClient) { this.aMapLocationClient = aMapLocationClient; } @Override protected void subscribeActual(Observer<? super AMapLocation> observer) { AMapCallBack callBack=new AMapCallBack(aMapLocationClient,observer); aMapLocationClient.setLocationListener(callBack); observer.onSubscribe(callBack); aMapLocationClient.startLocation(); }}Copy the code

Here is the basic of ObservableCreate class reform, through AMapLocationClient. SetLocationListener register to monitor, Callback (onNext, onComplete, onError);

  private static final class AMapCallBack implements AMapLocationListener, Disposable {
        private boolean isDisposed = false;
        private final AMapLocationClient aMapLocationClient;
        private final Observer<? super AMapLocation> observer;

        public AMapCallBack(AMapLocationClient aMapLocationClient, Observer<? super AMapLocation> observer) {
            this.aMapLocationClient = aMapLocationClient;
            this.observer = observer;
        }

        @Override
        public void onLocationChanged(AMapLocation aMapLocation) {

            try {
                if(aMapLocation ! = null) {if (aMapLocation.getErrorCode() == 0) {
                        observer.onNext(aMapLocation);
                        if(isDisposed){ observer.onComplete(); }}else {
                        try {
                            observer.onError(new AMapLocationException(aMapLocation.getErrorCode(),aMapLocation.getErrorInfo()));
                        } catch (Throwable inner) {
                            Exceptions.throwIfFatal(inner);
                            RxJavaPlugins.onError(new CompositeException(new AMapLocationException(aMapLocation.getErrorCode(),aMapLocation.getErrorInfo()), inner));
                        }

                    }
                }
            }catch (Throwable t){
                if(isDisposed){
                    RxJavaPlugins.onError(t);
                }else {
                    try {
                        observer.onError(t);
                    } catch (Throwable inner) {
                        Exceptions.throwIfFatal(inner);
                        RxJavaPlugins.onError(new CompositeException(t, inner));
                    }

                }
            }

        }

        @Override
        public void dispose() {
            aMapLocationClient.onDestroy();
            isDisposed = true;
        }

        @Override
        public boolean isDisposed() {
            returnisDisposed; }}Copy the code

So how do we call it? First we need an appearance class called AMapRxHelper. Then we analyze the requirements, because the SDK uses applicationContext in many places. It would be a hassle if we passed in every call, so we initialize the applicationContext globally and call it in the application.

private static Application app;

    public static void init(Application application) {
        app = application;
    }Copy the code

The positioning of AMapLocationClient requires a lot of initialization methods, we should be handed over to the caller to achieve their own, so how to do, we might as well refer to RxJava map operator implementation, through a transfer interface implementation:

    public static Observable<AMapLocation> createAMapLocation(final LocationSettingsListener listener) {
        AMapLocationClient client = new AMapLocationClient(app);
        return Observable.just(client)
                .flatMap(new Function<AMapLocationClient, ObservableSource<AMapLocation>>() {
                    @Override
                    public ObservableSource<AMapLocation> apply(@NonNull AMapLocationClient client) throws Exception {
                        if(listener ! = null) { listener.locationOptions(client); }returnnew LocationObservable(client); }}); } public interface LocationSettingsListener { void locationOptions(AMapLocationClient client); }Copy the code

Now that our location encapsulation is complete, let’s experiment with a wave of code. The activity code looks like this:

     AMapRxHelper.createAMapLocation(new AMapRxHelper.LocationSettingsListener() {
                @Override
                public void locationOptions(AMapLocationClient client) {
                    AMapLocationClientOption mLocationOption = new AMapLocationClientOption();
                    mLocationOption.setLocationMode(AMapLocationClientOption.AMapLocationMode.Hight_Accuracy);
                    mLocationOption.setOnceLocation(true);
                    mLocationOption.setOnceLocationLatest(true); client.setLocationOption(mLocationOption); } }).subscribe(new Consumer<AMapLocation>() { @Override public void accept(AMapLocation aMapLocation) throws Exception {  Toast.makeText(MainActivity.this,aMapLocation.toStr(),Toast.LENGTH_SHORT).show(); }}, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(MainActivity.this,throwable.getMessage(),Toast.LENGTH_SHORT).show(); }});Copy the code

Is this more comfortable and functional to write than autonavi’s native API? Of course, this is just one of the examples, in the source demo also respectively implemented POI search, input keyword search, binding EditText keyword search, interested partners can go to see, incidentally can give a star is better ~