Hongmeng development actual combat series: One of hongmeng development actual combat series: rounded corners

preface

In this article, we will share with you how to implement publish/subscribe EventBus, which is commonly used in Android development EventBus, RxBus and other frameworks.

Before we get started, let’s review what event publish/subscribe is on Android. For example, EventBus and RxBus are the publish/subscribe event frameworks we often choose in Android application development. They are used to replace traditional Intent, Handler, Broadcast, data transmission and execution methods between Activity, Fragment, and Service threads.

It has many advantages: simplified communication between application components; Decouple event senders and receivers; Avoid complex and error-prone dependency and lifecycle issues; Fast, optimized for high performance, etc.

Main working principle: the event source will produce the above message is sent to the event bus specific channel, then the listener will subscribe to the transaction in advance different channels of bus to distinguish between message response, and then when messages are sent to the transactions of a particular channel bus of the listener will listen to the message, and then the listener according to the procedures set in the response function to perform.

There is no EventBus or RxBus for us to use directly. How can we transmit messages and implement methods among Slice, Ability, Service and thread in Hongmun system? Brother Dei, if you don’t write out this tool library first, how can you complete a Hongmeng App in such a few weeks? The 30m broadsword in the product is not a joke.

Since Hongmon can only use Java code, so we look at RxJava this brother, why not like in Android, RxJava on the basis of a package RxBus? Isn’t it nice to keep up with Android’s big brother?

Ok, let’s do it. Let’s make a big mask of RxBus.

Hongmeng Rxbus implementation

1. Introduce the Rxjava library first

Implementation 'IO. Reactivex. Rxjava3: rxjava: 3.0.4'Copy the code

Create HarmonySchedulers

import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.Executor; public class HarmonySchedulers implements Executor { private static HarmonySchedulers instance; private final Scheduler mMainScheduler; private TaskDispatcher uiTaskDispatcher; private HarmonySchedulers() { mMainScheduler = Schedulers.from(this); } public static synchronized Scheduler mainThread() { if (instance == null) { instance = new HarmonySchedulers(); } return instance.mMainScheduler; } @Override public void execute(@NonNull Runnable command) { if (uiTaskDispatcher == null) { uiTaskDispatcher = getMainAbility().getUITaskDispatcher(); / / note that here the Ability to get the task of the UI thread launchers, Ability get their own way to} uiTaskDispatcher. DelayDispatch (runnable, delayTime); }}Copy the code

3. Create RxBus class to realize subscription, registration, unregistration and other functions

@SuppressWarnings("unused")
public class RxBus {
    public static final String LOG_BUS = "RXBUS_LOG";
    private static volatile RxBus defaultInstance;

    private Map<Class, List<Disposable>> subscriptionsByEventType = new HashMap<>();

    private Map<Object, List<Class>> eventTypesBySubscriber = new HashMap<>();

    private Map<Class, List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();

    private final Subject<Object> bus;

    private RxBus() {
        this.bus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        RxBus rxBus = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                rxBus = defaultInstance;
                if (defaultInstance == null) {
                    rxBus = new RxBus();
                    defaultInstance = rxBus;
                }
            }
        }
        return rxBus;
    }

    /**
     * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     *
     * @param eventType 事件类型
     * @return return
     */
    private <T> Flowable<T> toObservable(Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
    }

    /**
     * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
     *
     * @param code      事件code
     * @param eventType 事件类型
     */
    private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
                .filter(new Predicate<Message>() {
                    @Override
                    public boolean test(Message o) throws Exception {
                        return o.getCode() == code && eventType.isInstance(o.getObject());
                    }
                }).map(new Function<Message, Object>() {
                    @Override
                    public Object apply(Message o) throws Exception {
                        return o.getObject();
                    }
                }).cast(eventType);
    }

    /**
     * 注册
     *
     * @param subscriber 订阅者
     */
    public void register(Object subscriber) {
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                //获得参数类型
                Class[] parameterType = method.getParameterTypes();
                //参数不为空 且参数个数为1
                if (parameterType != null && parameterType.length == 1) {

                    Class eventType = parameterType[0];

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);
                } else if (parameterType == null || parameterType.length == 0) {

                    Class eventType = BusData.class;

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);

                }
            }
        }
    }
    
    /**
     * 将event的类型以订阅中subscriber为key保存到map里
     *
     * @param subscriber 订阅者
     * @param eventType  event类型
     */
    private void addEventTypeToMap(Object subscriber, Class eventType) {
        List<Class> eventTypes = eventTypesBySubscriber.get(subscriber);
        if (eventTypes == null) {
            eventTypes = new ArrayList<>();
            eventTypesBySubscriber.put(subscriber, eventTypes);
        }

        if (!eventTypes.contains(eventType)) {
            eventTypes.add(eventType);
        }
    }

    /**
     * 将注解方法信息以event类型为key保存到map中
     *
     * @param eventType        event类型
     * @param subscriberMethod 注解方法信息
     */
    private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods == null) {
            subscriberMethods = new ArrayList<>();
            subscriberMethodByEventType.put(eventType, subscriberMethods);
        }

        if (!subscriberMethods.contains(subscriberMethod)) {
            subscriberMethods.add(subscriberMethod);
        }
    }

    /**
     * 将订阅事件以event类型为key保存到map,用于取消订阅时用
     *
     * @param eventType  event类型
     * @param disposable 订阅事件
     */
    private void addSubscriptionToMap(Class eventType, Disposable disposable) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables == null) {
            disposables = new ArrayList<>();
            subscriptionsByEventType.put(eventType, disposables);
        }

        if (!disposables.contains(disposable)) {
            disposables.add(disposable);
        }
    }

    /**
     * 用RxJava添加订阅者
     *
     * @param subscriberMethod d
     */
    @SuppressWarnings("unchecked")
    private void addSubscriber(final SubscriberMethod subscriberMethod) {
        Flowable flowable;
        if (subscriberMethod.code == -1) {
            flowable = toObservable(subscriberMethod.eventType);
        } else {
            flowable = toObservable(subscriberMethod.code, subscriberMethod.eventType);
        }
        Disposable subscription = postToObservable(flowable, subscriberMethod)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        callEvent(subscriberMethod, o);
                    }
                });
        addSubscriptionToMap(subscriberMethod.subscriber.getClass(), subscription);
    }

    /**
     * 用于处理订阅事件在那个线程中执行
     *
     * @param observable       d
     * @param subscriberMethod d
     * @return Observable
     */
    private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
        Scheduler scheduler;
        switch (subscriberMethod.threadMode) {
            case MAIN:
                scheduler = HarmonySchedulers.mainThread();
                break;

            case NEW_THREAD:
                scheduler = Schedulers.newThread();
                break;

            case CURRENT_THREAD:
                scheduler = Schedulers.trampoline();
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
        }
        return observable.observeOn(scheduler);
    }

    /**
     * 回调到订阅者的方法中
     *
     * @param method code
     * @param object obj
     */
    private void callEvent(SubscriberMethod method, Object object) {
        Class eventClass = object.getClass();
        List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass);
        if (methods != null && methods.size() > 0) {
            for (SubscriberMethod subscriberMethod : methods) {
                Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class);
                int c = sub.code();
                if (c == method.code && method.subscriber.equals(subscriberMethod.subscriber) && method.method.equals(subscriberMethod.method)) {
                    subscriberMethod.invoke(object);
                }

            }
        }
    }

    /**
     * 取消注册
     *
     * @param subscriber object
     */
    public void unRegister(Object subscriber) {
        List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unSubscribeByEventType(subscriber.getClass());
                unSubscribeMethodByEventType(subscriber, eventType);
            }
            eventTypesBySubscriber.remove(subscriber);
        }
    }

    /**
     * subscriptions unsubscribe
     *
     * @param eventType eventType
     */
    private void unSubscribeByEventType(Class eventType) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables != null) {
            Iterator<Disposable> iterator = disposables.iterator();
            while (iterator.hasNext()) {
                Disposable disposable = iterator.next();
                if (disposable != null && !disposable.isDisposed()) {
                    disposable.dispose();
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 移除subscriber对应的subscriberMethods
     *
     * @param subscriber subscriber
     * @param eventType  eventType
     */
    private void unSubscribeMethodByEventType(Object subscriber, Class eventType) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods != null) {
            Iterator<SubscriberMethod> iterator = subscriberMethods.iterator();
            while (iterator.hasNext()) {
                SubscriberMethod subscriberMethod = iterator.next();
                if (subscriberMethod.subscriber.equals(subscriber)) {
                    iterator.remove();
                }
            }
        }
    }

    public void send(int code, Object o) {
        bus.onNext(new Message(code, o));
    }

    public void send(Object o) {
        bus.onNext(o);
    }

    public void send(int code) {
        bus.onNext(new Message(code, new BusData()));
    }

    private class Message {
        private int code;
        private Object object;

        public Message() {
        }

        private Message(int code, Object o) {
            this.code = code;
            this.object = o;
        }

        private int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        private Object getObject() {
            return object;
        }

        public void setObject(Object object) {
            this.object = object;
        }
    }
}
Copy the code

4. Add additional classes

Busdata.java event data encapsulation

public class BusData { String id; String status; public BusData() {} public BusData(String id, String status) { this.id = id; this.status = status; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; }}Copy the code

The Subscribe. Java annotations

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
    int code() default -1;

    ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
}
Copy the code

Subscribermethod.java performs registration method encapsulation

public class SubscriberMethod { public Method method; public ThreadMode threadMode; public Class<? > eventType; public Object subscriber; public int code; public SubscriberMethod(Object subscriber, Method method, Class<? > eventType, int code,ThreadMode threadMode) { this.method = method; this.threadMode = threadMode; this.eventType = eventType; this.subscriber = subscriber; this.code = code; Public void invoke(Object o){try {Class[] parameterType = method.getparameterTypes (); if(parameterType ! = null && parameterType.length == 1){ method.invoke(subscriber, o); } else if(parameterType == null || parameterType.length == 0){ method.invoke(subscriber); } } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); }}}Copy the code

Threadmode. Java thread model, used to identify event running threads

public enum ThreadMode {
    /**
     * current thread
     */
    CURRENT_THREAD,
    /**
     * android main thread
     */
    MAIN,
    /**
     * new thread
     */
    NEW_THREAD
}
Copy the code

Using Rxbus

1. Define event parameter classes

public class RxbusEvent {}
Copy the code

2. Define the event receiver

public class RxBusDemoAbilitySlice extends AbilitySlice { @Override protected void onStart(Intent intent) { super.onStart(intent); RxBus.get().register(this); Rxbus {Override protected void onStop() {super.onstop (); RxBus.get().unRegister(this); @subscribe (threadMode = threadmode.main) public void Subscribe(threadMode = threadmode.main) public void Subscribe(threadMode = threadMode rxBusRxbusEvent(RxbusEvent rxbusEvent) { if (rxbusEvent == null) { return; } // Perform the corresponding operation}}Copy the code

3. Send events

RxBus.get().send(new RxbusEvent()); // Send eventsCopy the code

conclusion

After finishing the work, the final use is basically the same as in Android. There are more codes in this period. Welcome to use the test and feedback the bugs. All right, let’s move on to other features.

This is the second part of the series, and we’ll be seeing more of it… .

If the article has a little inspiration for you, I hope you can click a like, to a concern collection not lost.