The main class tests the code

package com.designpattern.pubsubpattern;

import com.designpattern.pubsubpattern.sub.Subscriber;

public class Main {
    public static void main(String[] args) {
        new CallBackFacade<String>(()-> {
            return "helloworld";
        }).then(new Subscriber<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println("Success." + result);
            }
            @Override
            public void onFailure(a) {
                System.out.println("Failure");
            }
        }).then(new Subscriber() {
            @Override
            public void onSuccess(Object result) {
                System.out.println("Success again :" + result);
            }

            @Override
            public void onFailure(a) {
                System.out.println("Failed again."); } }).ok(); }}Copy the code

Unified interface for external use

package com.designpattern.pubsubpattern;

import com.designpattern.pubsubpattern.manager.Manager;
import com.designpattern.pubsubpattern.message.Message;
import com.designpattern.pubsubpattern.pub.Publisher;
import com.designpattern.pubsubpattern.sub.Subscriber;

import java.util.concurrent.*;

public class CallBackFacade<T> {

    private int nThreads = 0;
    private Manager manager;
    private Publisher publisher;
    private FutureTask<T> futureTask;
    private ExecutorService executorService;
    private CountDownLatch countDownLatch;


    private CallBackFacade(a){}public CallBackFacade(Callable<T> callable) {
        manager = new Manager();
        publisher = new Publisher(manager);
        futureTask = new FutureTask<T>(callable);
        executorService = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
    }

    public CallBackFacade<T> then(Subscriber subscriber) {
        subscriber.setManager(manager);
        subscriber.subscribeTopic("hello");
        nThreads ++;
        executorService.execute(()->{
            try {
                T t = futureTask.get();
                publisher.publish(new Message().setSuccess(true).setResult(t).setTopic("hello"));
            } catch (Exception e) {
                publisher.publish(new Message().setSuccess(false).setTopic("hello"));
            }
            countDownLatch.countDown();
        });
        return this;
    }

    public void ok(a) {
        countDownLatch = new CountDownLatch(nThreads);
        executorService.execute(futureTask);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ executorService.shutdown(); }}}Copy the code

Published message

package com.designpattern.pubsubpattern.message;

import java.util.Objects;

/** * theme */
public class Message<Result> {

    /** * Return result on success */
    private Result result = null;

    /** * Whether successful */
    private boolean success = false;

    /** * theme */
    private String topic = "";

    /** * no argument constructor, no use */
    public Message(a) {
        result = null;
    }

    /** ** ** ** ** ** ** ** ** ** **@paramThe topic theme *@paramResult the result * /
    public Message(String topic, Result result) {
        this.setSuccess(true).setResult(result).setTopic(topic);
    }

    /** * only the theme is required, but the result is not required@paramThe topic theme * /
    public Message(String topic) {
        this.setSuccess(false).setTopic(topic);
    }

    /** * returns success or not *@returnSuccess or failure */
    public boolean isSuccess(a) {
        return success;
    }

    /** * Sets the return result *@paramResult Returns the result *@returnReturns the this pointer to facilitate the chain call */
    public Message<Result> setResult(Result result) {
        this.result = result;
        return this;
    }

    /** * Whether the configuration is successful *@paramSuccess Message *@returnReturns the this pointer to facilitate the chain call */
    public Message<Result> setSuccess(boolean success) {
        this.success = success;
        return this;
    }

    /** * set the theme *@paramThe topic theme * /
    public Message setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public Result getResult(a) {
        return result;
    }

    public String getTopic(a) {
        return topic;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null|| getClass() ! = o.getClass())return false; Message<? > message = (Message<? >) o;return success == message.success && Objects.equals(result, message.result) && Objects.equals(topic, message.topic);
    }

    @Override
    public int hashCode(a) {
        returnObjects.hash(result, success, topic); }}Copy the code

Manager (Subscriber Registration and Discovery Center)

package com.designpattern.pubsubpattern.manager;

import com.designpattern.pubsubpattern.message.Message;
import com.designpattern.pubsubpattern.sub.Subscriber;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

/** * Publish-subscribe manager. * A subscriber registration and discovery center for a topic. * The Spring-like IOC container. * or a service registry and discovery center for microservices frameworks. * /
public class Manager {

    /** * topic-key, subscriber-value dictionary * is used only if ConcurrentHashmap or ConcurrentSkipListMap */ is used in a multi-threaded environment
    private HashMap<String, Set<Subscriber>> messageSetHashMap = new HashMap<>();


    /** * fixed-length thread pool * notifications for asynchronously calling subscribers * barely used, adjusted for specific circumstances */
// private ExecutorService executorService = Executors.newFixedThreadPool(10);

    /** * add subscriber *@paramThe topic theme *@paramSubscriber subscriber */
    public void addSubscriber(String topic, Subscriber subscriber) {
        /** * the argument cannot be empty */
        if (topic == null || subscriber == null) {
            throw new NullPointerException("Null pointer is abnormal!");
        }
        /** * if there is no such key in the dictionary, add it and new a HashSet as a container for subscribers */
        if(! messageSetHashMap.containsKey(topic)) { messageSetHashMap.put(topic,new HashSet<>());
        }
        /** * adds the subscriber */ to the container of the subscriber collection values corresponding to the topic key
        messageSetHashMap.get(topic).add(subscriber);
    }

    /** ** Remove subscriber *@paramThe topic theme *@paramSubscriber subscriber */
    public void removeSubscriber(String topic, Subscriber subscriber) {
        /** * It is necessary to remove the subscriber */ only if the topic is in the key
        if(messageSetHashMap.containsKey(topic)) { messageSetHashMap.get(topic).remove(subscriber); }}/** * Publish the message and send the message to all subscribers (call all subscribers' methods) *@param message
     */
    public void publish(Message message) {
        /** * Subscriber collection */
        Set<Subscriber> subscriberSet = messageSetHashMap.get(message.getTopic());

        /** * call */ synchronously
        for (Subscriber subscriber : subscriberSet) {
            /** * succeeded */
            if (message.isSuccess()) {
                subscriber.onSuccess(message.getResult());
            } else {
                /** * failed */subscriber.onFailure(); }}/** * call */ asynchronously
        // Todo thread pool commit}}Copy the code

The publisher

package com.designpattern.pubsubpattern.pub;

import com.designpattern.pubsubpattern.manager.Manager;
import com.designpattern.pubsubpattern.message.Message;

/** * publisher */
public class Publisher {

    /** * Publish-subscribe manager */
    private Manager manager;

    /** ** */ ** */
    public Publisher(a) {}/** * has an argument constructor, which is used to set in *@param manager
     */
    public Publisher(Manager manager) {
        this.manager = manager;
    }

    public void publish(Message message) { manager.publish(message); }}Copy the code

Subscriber abstract class

package com.designpattern.pubsubpattern.sub;

import com.designpattern.pubsubpattern.manager.Manager;

/** * subscriber */
public abstract class Subscriber<T> {

    protected Manager manager;

    public Subscriber(a) {}public Subscriber(Manager manager) {
        this.manager = manager;
    }

    public void setManager(Manager manager) {
        this.manager = manager;
    }

    /** * Subscribe to a topic *@paramThe topic theme * /
    public void subscribeTopic(String topic) {
        manager.addSubscriber(topic, this);
    }

    /** * Unsubscribe from a topic *@paramThe topic theme * /
    public void unsubscribeTopic(String topic) {
        manager.removeSubscriber(topic, this);
    }

    public abstract void onSuccess(T result);

    public abstract void onFailure(a);

}

Copy the code