preface

Publish subscriptions have a wide range of uses, such as component communication for single-page SPA applications, and the execution of asynchronous tasks that cannot be controlled by subscribes.

Of course, the open source community already has a mature publish-subscribe model, such as the Pubsub-JS library, but it has a disadvantage that it does not satisfy the concurrent publishing scenario, because its publishing is all thrown by setTimeout, and the end result is nothing more than the execution of the macro task column.

Here we use promise.all to simulate a concurrency scenario;

You can see its source code:

Here I have written a simple release and subscription, there may be many deficiencies, if the big guy comes by, you can correct my mistakes, I am just an intern, I hope you can enlighten me.

My-PubSub

// ES6 singleton mode

class EventBus {

    // The singleton pattern exposes an external exposed interface, ensuring that all new objects refer to the same instance
    
    static getInstance() {
        if(! EventBus.instance) { EventBus.instance =new EventBus()
        }
        return EventBus.instance;
    }

    constructor() {
        // Event central modem
        this.EVENT_QUEUE = {};
        // Initialize the event token
        this.token = null;
        // Initialize eventId
        this.eventId = - 1;
        // Initializes the Promise queue encapsulated with async enabled to simulate concurrent publishing by multiple publishers
        this.promiseItemArr =  [];
    }

    /** * @time 2019/12/16 23:35 * @author Eric Wang 
      
        * @desc Subscribe topic * @param {String} topic - Subscribe topic * @param {Function} fn - Callback after subscribing to a topic to receive MSG */ sent by publish
      @vip.qq.com>
    subscribe(topic, fn) {
        / / eventId since
        ++this.eventId;

        // Symbol generates a unique token that records each subscriber to the event queue
        this.token = String(Symbol.for(this.eventId));

        if(! (topicin this.EVENT_QUEUE)) {
            this.EVENT_QUEUE[topic] = {};
        }

        this.EVENT_QUEUE[topic][this.token] = [];
        typeof fn === 'function' && this.EVENT_QUEUE[topic][this.token].push(fn);
        return this.token;
    }

    /** * @time 2019/12/16 23:38 * @author Eric Wang 
      
        * @desc Publish message * @param {String} topic - corresponding subscribe topic * @param {any} MSG - Published messages, sent to subscribers * @param {Object} options - published configuration, whether to enable asynchronous publishing */
      @vip.qq.com>
    publish(topic, msg, options = {}) {

        // Merge configuration, configure async, whether to enable async
        options = Object.assign({
            async: false
        }, options);

        const topics = this.EVENT_QUEUE[topic];

        /** * @time 2019/12/16 23:27 * @author Eric Wang 
      
        * @desc execute all subscriber functions under each topic * @param {Array} topic The callback queue for all subscribers to each topic in the event central modem * @param {Object} MSG The message sent by the published message */
      @vip.qq.com>
        const executeTopic = (topic, msg) = > {

            // Enable asynchrony
            if(options.async) {
                topic.forEach(item= > {
                    this.promiseItemArr.push(new Promise(resolve= > {
                        // First come, first executed, first come, first published
                        setTimeout((a)= > {
                            item.call(this, msg);
                        }, 0);
                        resolve(true); }})));// Promise simulates concurrency
                Promise.all(this.promiseItemArr)
                    .catch(err= > Promise.reject(err));
            } else {
                topic.forEach(item= > item.call(this, msg)); }};// Do not throw an exception to prevent the program from running
        if(! (topicin this.EVENT_QUEUE)) {
            return false;
        }

        // Fetch all subscribers of a publisher
       for (let key intopics) { executeTopic(topics[key], msg); }}/** * @time 2019/12/16 23:21 * @author Eric Wang 
      
        * @desc Unsubscribe * @param {String} token - Token for each subscriber * /
      @vip.qq.com>
    unsubscribe(token) {
        Object.keys(this.EVENT_QUEUE).forEach( item= > {
            if(token in this.EVENT_QUEUE[item]) {
                delete this.EVENT_QUEUE[item]; }})}/**
     * @time  2019/12/16 23:23
     * @author  Eric Wang <[email protected]>
     * @desc  订阅一次once
     * @param  {String} topic -订阅的主题
     * @param  {Function} fn  -订阅之后的回调fn,用来接收public发送的消息
     */
    once(topic, fn) {
        const event = this.subscribe(topic, (... rest) => {this.unsubscribe(event);
            fn.apply(this, rest); }); }}Copy the code

End

Emmm, I have to go to work tomorrow