Mooring floating purpose in this paper, starting from Jane books column: www.jianshu.com/nb/21050520

Wen is participating in the “Java Theme Month – Java Development Practice”, for more details (link)[juejin.cn/post/696826…]

preface

Both event – and message-driven are powerful tools for decoupling. ZStack, as a large software project, also uses these schemes to decouple the entire architecture.

EventFacade

The EventFacade is an interesting component because it is almost bootstrap. That means interested friends can copy and paste, and then make a few modifications to work on their own projects.

How to use it

In the REPo of ZStack, the corresponding case is also provided:

package org.zstack.test.core.cloudbus;
/** * Created with IntelliJ IDEA. * User: frank * Time: 12:38 AM * To change this template use File | Settings | File Templates. */
public class TestCanonicalEvent {
    CLogger logger = Utils.getLogger(TestCanonicalEvent.class);
    ComponentLoader loader;
    EventFacade evtf;
    boolean success;

    @Before
    public void setUp(a) throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        evtf = loader.getComponent(EventFacade.class);
        ((EventFacadeImpl) evtf).start();
    }

    @Test
    public void test(a) throws InterruptedException {
        String path = "/test/event";
        evtf.on(path, new EventRunnable() {
            @Override
            public void run(a) {
                success = true; }}); evtf.fire(path,null);
        TimeUnit.SECONDS.sleep(1); Assert.assertTrue(success); }}Copy the code

The method is very simple: register a path to receive events, then send an event along that path, and the function registered by the event will be called.

The interface definition

package org.zstack.core.cloudbus;

import java.util.Map;

/** * Created with IntelliJ IDEA. * User: frank * Time: 11:29 PM * To change this template use File | Settings | File Templates. */
public interface EventFacade {
    void on(String path, AutoOffEventCallback cb);

    void on(String path, EventCallback cb);

    void on(String path, EventRunnable runnable);

    void off(AbstractEventFacadeCallback cb);

    void onLocal(String path, AutoOffEventCallback cb);

    void onLocal(String path, EventCallback cb);

    void onLocal(String path, EventRunnable runnable);

    void fire(String path, Object data);

    boolean isFromThisManagementNode(Map tokens);

    String META_DATA_MANAGEMENT_NODE_ID = "metadata::managementNodeId";
    String META_DATA_PATH = "metadata::path";
    String WEBHOOK_TYPE = "CanonicalEvent";
}

Copy the code

The source code interpretation

on

   @Override
    public void on(String path, AutoOffEventCallback cb) {
        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
    }

    @Override
    public void on(String path, final EventCallback cb) {
        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
    }

    @Override
    public void on(String path, EventRunnable cb) {
        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
    }
Copy the code

The ON method simply takes a UUID belonging to EventRunnable as the key and puts Callback as value into the Global map. Why would ** do that? ** Since the Map key is not repeatable, storing path is definitely not appropriate.

EventFacadeImpl method signature and member variables:

public class EventFacadeImpl implements EventFacade.CloudBusEventListener.Component.GlobalApiMessageInterceptor {
    @Autowired
    private CloudBus bus;

    private final Map<String, CallbackWrapper> global = Collections.synchronizedMap(new HashMap<>());
    private final Map<String, CallbackWrapper> local =  new ConcurrentHashMap<>();

    private EventSubscriberReceipt unsubscriber;
Copy the code

fire

The opposite fire method:

    @Override
    public void fire(String path, Object data) {
        assertpath ! =null;
        CanonicalEvent evt = new CanonicalEvent();
        evt.setPath(path);
        evt.setManagementNodeId(Platform.getManagementServerId());
        if(data ! =null) {
            /* if (! TypeUtils.isPrimitiveOrWrapper(data.getClass()) && ! data.getClass().isAnnotationPresent(NeedJsonSchema.class)) { throw new CloudRuntimeException(String.format("data[%s] passed to canonical event is not annotated by @NeedJsonSchema", data.getClass().getName())); } * /

            evt.setContent(data);
        }
        // Find the corresponding event from the local map and call it
        fireLocal(evt);
        // Send the event to the corresponding Webhook
        callWebhooks(evt);
        // Send events through cloudBus. The cloudBus source will be covered later
        bus.publish(evt);
    }
Copy the code

Difference between onLocal and on

We don’t see how global events are triggered in the above analysis. To fully understand the process, we have to start with CloudBus, which we will cover in a moment. But you can already guess why you want to distinguish on from onLocal. One is fired over the message bus and one is fired within the current JVM process — meaning that one supports ManagerNode cluster events and one supports only a single MN event. This also comes from the business scenario of ZStack — there are things that MN needs to do together, and there are things that one MN does that other MNS don’t need to do. For the sake of space, interested readers can browse through the code for themselves, so I won’t go into detail here.

WebHook

WebHook is one of the active means of ZStack forward communication. Once the corresponding EventPath is registered, the path is called and the content is sent to the corresponding URL. From the case – CanonicalEventWebhookCase and WebhookCase can see it the proper use of position.

CanonicalEventWebhookCase

class CanonicalEventWebhookCase extends SubCase { EnvSpec envSpec @Override void clean() { envSpec.delete() } @Override void setup() { INCLUDE_CORE_SERVICES = false spring { include("webhook.xml") } } String WEBHOOK_PATH = "/canonical-event-webhook" void testErrorToCreateWebhookifOpaqueFieldMissing() { expect(AssertionError.class) { CreateWebhook {name = "webhook1" URL = "http://127.0.0.1:8989$WEBHOOK_PATH" type = eventfacade. WEBHOOK_TYPE}}} void testCanonicalEventWithVariableInPath() { String path = "/test/{uuid}/event" int count = 0 WebhookInventory hook1 = CreateWebhook {name = "webhook1" URL = "http://127.0.0.1:8989$WEBHOOK_PATH" type = eventfacade. WEBHOOK_TYPE Opaque = path } // this webhook will not be called because path unmatching WebhookInventory hook2 = createWebhook { name = "Webhook1" URL = "http://127.0.0.1:8989$WEBHOOK_PATH" type = eventfacade. WEBHOOK_TYPE Opaque = "/this-path-does-not-match" } CanonicalEvent evt envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> evt = json(e.getBody(), CanonicalEvent.class) count ++ return [:] } String content = "hello world" String eventPath = "/test/${Platform.uuid}/event" bean(EventFacade.class).fire(eventPath, content) retryInSecs { assert count == 1 assert evt ! = null assert evt.path == eventPath assert evt.content == content assert evt.managementNodeId == Platform.getManagementServerId() } } void testCanonicalEventUseWebhook() { String path = "/test/event" WebhookInventory Hook1 = createWebhook {name = "webhook1" URL = "http://127.0.0.1:8989$WEBHOOK_PATH" type = eventfacade.webhook_type Opaque = path} WebhookInventory hook2 = createWebhook {name = "webhook2" URL = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = path } def testFireTwoEvents = { List<CanonicalEvent> evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 2 CanonicalEvent evt1 = evts[0] CanonicalEvent evt2 = evts[1] assert evt1.path == path assert evt1.content == content assert evt1.managementNodeId == Platform.getManagementServerId() assert  evt2.path == path assert evt2.content == content assert evt2.managementNodeId == Platform.getManagementServerId() } } def testOneEventsGetAfterDeleteOneHook = { deleteWebhook { uuid = hook1.uuid } List<CanonicalEvent> evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 1 } } def testNoEventGetAfterDeleteAllHooks = { deleteWebhook { uuid = hook2.uuid } List<CanonicalEvent> evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> CanonicalEvent evt  = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 0 } } testFireTwoEvents() testOneEventsGetAfterDeleteOneHook() testNoEventGetAfterDeleteAllHooks() } @Override void environment() { envSpec = env { // nothing } } @Override void test() { envSpec.create { testCanonicalEventUseWebhook() testCanonicalEventWithVariableInPath() testErrorToCreateWebhookifOpaqueFieldMissing() } } }Copy the code

WebhookCase

class WebhookCase extends SubCase { EnvSpec envSpec @Override void clean() { envSpec.delete() } @Override void setup() {  INCLUDE_CORE_SERVICES = false spring { include("webhook.xml") } } @Override void environment() { envSpec = env { // nothing } } void testWebhooksCRUD() { WebhookInventory hook = null def testCreateWebhook = { def params = null hook = CreateWebhook {name = "webhook" type = "custom - type" url = "http://127.0.0.1:8080/webhook" description = "desc" opaque = "test data" params = delegate } assert dbIsExists(hook.uuid, WebhookVO.class) assert hook.name == params.name assert hook.type == params.type assert hook.url == params.url assert hook.description == params.description assert hook.opaque == params.opaque } def testQueryWebhook = { List<WebhookInventory> invs = queryWebhook { conditions = ["name=${hook.name}"] } assert invs.size() == 1 assert invs[0].uuid == hook.uuid } def testDeleteWebhook = { deleteWebhook { uuid = hook.uuid } assert ! dbIsExists(hook.uuid, WebhookVO.class) } testCreateWebhook() testQueryWebhook() testDeleteWebhook() } void testInvalidUrl() { expect(AssertionError.class) { createWebhook { name = "webhook" type = "custom-type" url = "this is not a url" description = "desc" opaque = "test data" } } } @Override void test() { envSpec.create { testWebhooksCRUD() testInvalidUrl() } } }Copy the code

CloudBus

CloudBus can be said to be the most important component of ZStack, ZStack each module communication is completed by Message, and CloudBus is their communication medium, next we look at its source code.

This section is suitable for those who have some knowledge of AMQP. If not, you can read my blog MQ to learn tips

How to use it

Let’s start with a related Case:

package org.zstack.test.core.cloudbus;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.TimeUnit;

public class TestCloudBusCall {
    CLogger logger = Utils.getLogger(TestCloudBusCall.class);
    ComponentLoader loader;
    CloudBusIN bus;
    Service serv;

    public static class HelloWorldMsg extends NeedReplyMessage {
        private String greet;

        public String getGreet(a) {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet; }}public static class HelloWorldReply extends MessageReply {
        private String greet;

        public String getGreet(a) {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet; }}class FakeService extends AbstractService {
        @Override
        public boolean start(a) {
            bus.registerService(this);
            bus.activeService(this);
            return true;
        }

        @Override
        public boolean stop(a) {
            bus.deActiveService(this);
            bus.unregisterService(this);
            return true;
        }

        @Override
        public void handleMessage(Message msg) {
            if (msg.getClass() == HelloWorldMsg.class) {
                HelloWorldMsg hmsg = (HelloWorldMsg) msg;
                HelloWorldReply r = newHelloWorldReply(); r.setGreet(hmsg.getGreet()); bus.reply(msg, r); }}@Override
        public String getId(a) {
            return this.getClass().getCanonicalName(); }}@Before
    public void setUp(a) throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        bus = loader.getComponent(CloudBusIN.class);
        serv = new FakeService();
        serv.start();
    }

    @Test
    public void test(a) throws InterruptedException, ClassNotFoundException {
        HelloWorldMsg msg = new HelloWorldMsg();
        msg.setGreet("Hello");
        msg.setServiceId(FakeService.class.getCanonicalName());
        msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
        HelloWorldReply r = (HelloWorldReply) bus.call(msg);
        serv.stop();
        Assert.assertEquals("Hello", r.getGreet()); }} We register a Service, override the HandleMessage method, and in Case we successfully receive the message and pass the assertion. Here's another one: ~~~ Javapackage org.zstack.test.core.cloudbus;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class TestCloudBusSendCallback {
    CLogger logger = Utils.getLogger(TestCloudBusSendCallback.class);
    ComponentLoader loader;
    CloudBusIN bus;
    CountDownLatch latch = new CountDownLatch(1);
    boolean isSuccess = false;
    Service serv;

    public static class HelloWorldMsg extends NeedReplyMessage {
        private String greet;

        public String getGreet(a) {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet; }}public static class HelloWorldReply extends MessageReply {
        private String greet;

        public String getGreet(a) {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet; }}class FakeService extends AbstractService {
        @Override
        public boolean start(a) {
            bus.registerService(this);
            bus.activeService(this);
            return true;
        }

        @Override
        public boolean stop(a) {
            bus.deActiveService(this);
            bus.unregisterService(this);
            return true;
        }

        @Override
        public void handleMessage(Message msg) {
            if (msg.getClass() == HelloWorldMsg.class) {
                HelloWorldMsg hmsg = (HelloWorldMsg) msg;
                HelloWorldReply r = newHelloWorldReply(); r.setGreet(hmsg.getGreet()); bus.reply(msg, r); }}@Override
        public String getId(a) {
            return this.getClass().getCanonicalName(); }}@Before
    public void setUp(a) throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        bus = loader.getComponent(CloudBusIN.class);
        serv = new FakeService();
        serv.start();
    }

    @Test
    public void test(a) throws InterruptedException, ClassNotFoundException {
        HelloWorldMsg msg = new HelloWorldMsg();
        msg.setGreet("Hello");
        msg.setServiceId(FakeService.class.getCanonicalName());
        msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
        bus.send(msg, new CloudBusCallBack(null) {
            @Override
            public void run(MessageReply reply) {
                if (reply instanceof HelloWorldReply) {
                    HelloWorldReply hr = (HelloWorldReply) reply;
                    if ("Hello".equals(hr.getGreet())) {
                        isSuccess = true; } } latch.countDown(); }}); latch.await(15, TimeUnit.SECONDS);
        serv.stop();
        Assert.assertEquals(true, isSuccess); }}Copy the code

It also registers a Service and then uses a CallBack. If you run it, the assertion will pass — meaning the CallBack will be called.

To sum up, it is very easy to use CloudBus — just register your Service, send it to the Service, and if you need to register your CallBack, you can do it easily.

## Interface definition is so useful, internal implementation is probably not too simple. We start with [interface] (HTTP: / / https://github.com/zstackio/zstack/blob/d6f511e6c15a2fab3e57a93637ada63cc4b3ee6c/core/src/main/java/org/zstack/ The core/cloudbus cloudbus. Java) began to watch: ~ ~ ~ the Java package org. Zstack. Core. Cloudbus; import org.zstack.header.Component; import org.zstack.header.Service; import org.zstack.header.errorcode.ErrorCode; import org.zstack.header.exception.CloudConfigureFailException; import org.zstack.header.message.*; import java.util.List; public interface CloudBus extends Component { void send(Message msg); <T extends Message> void send(List<T> msgs); void send(NeedReplyMessage msg, CloudBusCallBack callback); @Deprecated void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack); @Deprecated void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusListCallBack callBack); @Deprecated void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusSteppingCallback callback); void route(List<Message> msgs); void route(Message msg); void reply(Message request, MessageReply reply); void publish(List<Event> events); void publish(Event event); MessageReply call(NeedReplyMessage msg); <T extends NeedReplyMessage> List<MessageReply> call(List<T> msg); void registerService(Service serv) throws CloudConfigureFailException; void unregisterService(Service serv); EventSubscriberReceipt subscribeEvent(CloudBusEventListener listener, Event... events); void dealWithUnknownMessage(Message msg); void replyErrorByMessageType(Message msg, Exception e); void replyErrorByMessageType(Message msg, String err); void replyErrorByMessageType(Message msg, ErrorCode err); void logExceptionWithMessageDump(Message msg, Throwable e); String makeLocalServiceId(String serviceId); void makeLocalServiceId(Message msg, String serviceId); String makeServiceIdByManagementNodeId(String serviceId, String managementNodeId); void makeServiceIdByManagementNodeId(Message msg, String serviceId, String managementNodeId); String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid); void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid); void installBeforeDeliveryMessageInterceptor(BeforeDeliveryMessageInterceptor interceptor, Class<? extends Message>... classes); void installBeforeSendMessageInterceptor(BeforeSendMessageInterceptor interceptor, Class<? extends Message>... classes); void installBeforePublishEventInterceptor(BeforePublishEventInterceptor interceptor, Class<? extends Event>... classes); }Copy the code

The naming semantics of interfaces are relatively clear and will not be explained here. Start our source code reading tour.

The source code interpretation

What did CloudBus do with ZStack Starting?

init

Init is a hook that Spring provides when the bean is in the loader. In XML we can see the declaration:


      
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
	xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://zstack.org/schema/zstack http://zstack.org/schema/zstack/plugin.xsd"
	default-init-method="init" default-destroy-method="destroy">

    <bean id="TimeoutManager" class="org.zstack.core.timeout.ApiTimeoutManagerImpl" />

    <bean id="CloudBus" class = "org.zstack.core.cloudbus.CloudBusImpl2" depends-on="ThreadFacade,ThreadAspectj">
    	<zstack:plugin>
			<zstack:extension interface="org.zstack.header.managementnode.ManagementNodeChangeListener" order="9999"/>
		</zstack:plugin>
    </bean>

    <bean id="EventFacade" class = "org.zstack.core.cloudbus.EventFacadeImpl">
    	<zstack:plugin>
		<zstack:extension interface="org.zstack.header.Component" />
		<zstack:extension interface="org.zstack.header.apimediator.GlobalApiMessageInterceptor" />
	</zstack:plugin>
    </bean>
    
    <bean id="ResourceDestinationMaker" class="org.zstack.core.cloudbus.ResourceDestinationMakerImpl" />
    
    <bean id="MessageIntegrityChecker" class="org.zstack.core.cloudbus.MessageIntegrityChecker">
    	<zstack:plugin>
			<zstack:extension interface="org.zstack.header.Component" />
		</zstack:plugin>
    </bean>
</beans>
Copy the code

The init method:

    void init(a) {
        trackerClose = CloudBusGlobalProperty.CLOSE_TRACKER;
        serverIps = CloudBusGlobalProperty.SERVER_IPS;
        tracker = new MessageTracker();

        ConnectionFactory connFactory = new ConnectionFactory();
        List<Address> addresses = CollectionUtils.transformToList(serverIps, new Function<Address, String>() {
            @Override
            public Address call(String arg) {
                returnAddress.parseAddress(arg); }}); connFactory.setAutomaticRecoveryEnabled(true);
        connFactory.setRequestedHeartbeat(CloudBusGlobalProperty.RABBITMQ_HEART_BEAT_TIMEOUT);
        connFactory.setNetworkRecoveryInterval((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_NETWORK_RECOVER_INTERVAL));
        connFactory.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_CONNECTION_TIMEOUT));

        logger.info(String.format("use RabbitMQ server IPs: %s", serverIps));

        try {
            if(CloudBusGlobalProperty.RABBITMQ_USERNAME ! =null) {
                connFactory.setUsername(CloudBusGlobalProperty.RABBITMQ_USERNAME);
                logger.info(String.format("use RabbitMQ username: %s", CloudBusGlobalProperty.RABBITMQ_USERNAME));
            }
            if(CloudBusGlobalProperty.RABBITMQ_PASSWORD ! =null) {
                connFactory.setPassword(CloudBusGlobalProperty.RABBITMQ_PASSWORD);
                logger.info("use RabbitMQ password: ******");
            }
            if(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST ! =null) {
                connFactory.setVirtualHost(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST);
                logger.info(String.format("use RabbitMQ virtual host: %s", CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST));
            }

            conn = connFactory.newConnection(addresses.toArray(new Address[]{}));
            logger.debug(String.format("rabbitmq connection is established on %s", conn.getAddress()));

            ((Recoverable)conn).addRecoveryListener(new RecoveryListener() {
                @Override
                public void handleRecovery(Recoverable recoverable) {
                    logger.info(String.format("rabbitmq connection is recovering on %s", conn.getAddress().toString())); }}); channelPool =new ChannelPool(CloudBusGlobalProperty.CHANNEL_POOL_SIZE, conn);
            createExchanges();
            outboundQueue = new BusQueue(makeMessageQueueName(SERVICE_ID), BusExchange.P2P);
            Channel chan = channelPool.acquire();
            chan.queueDeclare(outboundQueue.getName(), false.false.true, queueArguments());
            chan.basicConsume(outboundQueue.getName(), true, consumer);
            chan.queueBind(outboundQueue.getName(), outboundQueue.getBusExchange().toString(), outboundQueue.getBindingKey());
            channelPool.returnChannel(chan);
            maid.construct();
            noRouteEndPoint.construct();
            tracker.construct();
            tracker.trackService(SERVICE_ID);
        } catch (Exception e) {
            throw newCloudRuntimeException(e); }}Copy the code

In simple terms, this function tries to get the configuration file associated with RabbitMQ, initializes the Connection, and creates a channel poll from that. It then binds a channel to a messageQueue. Both EventMaid and noRouteEndPoint and Tracker are Message consumers, as their names indicate, one for the subscribe/publish model (queues bound to this exchange all receive messages) and one for Track.

start

Start is a hook defined by ZStack that is called when the ManagerNode is up.

   @Override
    public boolean start(a) {
        populateExtension();
        prepareStatistics();

        for (Service serv : services) {
            assertserv.getId() ! =null : String.format("service id can not be null[%s]", serv.getClass().getName());
            registerService(serv);
        }

        jmxf.registerBean("CloudBus".this);

        return true;
    }
Copy the code

One by one:

    private void populateExtension(a) {
        services = pluginRgty.getExtensionList(Service.class);
        for (ReplyMessagePreSendingExtensionPoint extp : pluginRgty.getExtensionList(ReplyMessagePreSendingExtensionPoint.class)) {
            List<Class> clazzs = extp.getReplyMessageClassForPreSendingExtensionPoint();
            if (clazzs == null || clazzs.isEmpty()) {
                continue;
            }

            for (Class clz : clazzs) {
                if(! (APIEvent.class.isAssignableFrom(clz)) && ! (MessageReply.class.isAssignableFrom(clz))) {throw new CloudRuntimeException(String.format("ReplyMessagePreSendingExtensionPoint can only marshal APIEvent or MessageReply. %s claimed by %s is neither APIEvent nor MessageReply",
                            clz.getName(), extp.getClass().getName()));
                }

                List<ReplyMessagePreSendingExtensionPoint> exts = replyMessageMarshaller.get(clz);
                if (exts == null) {
                    exts = newArrayList<ReplyMessagePreSendingExtensionPoint>(); replyMessageMarshaller.put(clz, exts); } exts.add(extp); }}}Copy the code

All classes that inherit from the Service are first collected, and then the extensionPoint that will change MSG Reply is loaded.

 private void prepareStatistics(a) {
        List<Class> needReplyMsgs = BeanUtils.scanClassByType("org.zstack", NeedReplyMessage.class);
        needReplyMsgs = CollectionUtils.transformToList(needReplyMsgs, new Function<Class, Class>() {
            @Override
            public Class call(Class arg) {
                return! APIMessage.class.isAssignableFrom(arg) || APISyncCallMessage.class.isAssignableFrom(arg) ? arg :null; }});for (Class clz : needReplyMsgs) {
            MessageStatistic stat = newMessageStatistic(); stat.setMessageClassName(clz.getName()); statistics.put(stat.getMessageClassName(), stat); }}Copy the code

Set statistics for the MSG to reply to.

After that, all the services are collected to facilitate the distribution of Msg.

Commonly used method

CloudBus.makeLocalServiceId

    @Override
    public String makeLocalServiceId(String serviceId) {
        return serviceId + "." + Platform.getManagementServerId();
    }

    @Override
    public void makeLocalServiceId(Message msg, String serviceId) {
        msg.setServiceId(makeLocalServiceId(serviceId));
    }
Copy the code

As described in the ZStack scalability secret: Stateless Services, each management node registers a stack of service queues. So we assemble it in its format so that the message can be received by the service.

CloudBus.makeTargetServiceIdByResourceUuid

@Override public String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid) { DebugUtils.Assert(serviceId! =null, "serviceId cannot be null"); DebugUtils.Assert(resourceUuid! =null, "resourceUuid cannot be null"); / / get resources in MN UUID String mgmtUuid = destMaker. MakeDestination (resourceUuid); return serviceId + "." + mgmtUuid; } @Override public void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid) { String targetService = makeTargetServiceIdByResourceUuid(serviceId, resourceUuid); msg.setServiceId(targetService); }Copy the code

In ZStack, managerNodes are most likely clustered, with each MN managing different resources. Then you need a consistent hash ring to determine which MN the resource is in.

CloudBus.send

@Override public void send(final NeedReplyMessage msg, EvaluateMessageTimeout (MSG); E = new Envelope() {// If the MSG is already sent AtomicBoolean called = new AtomicBoolean(false); final Envelope self = this; // Calculate timeout, TimeoutTaskReceipt = thdf.submitTimeoutTask(new Runnable() {@override public void run() {  self.timeout(); } }, TimeUnit.MILLISECONDS, msg.getTimeout()); Public void ack(MessageReply reply) {// Count (MSG); envelopes. Remove (msg.getid ()); If the update fails, the message has already been sent. Return the if (! called.compareAndSet(false, true)) { return; } / / cancel a computational task timeoutTaskReceipt overtime. The cancel (); // Call the registered callback callback.run(reply); } @override public void timeout() {envelopes. Remove (msg.getid ()); // return if (! called.compareAndSet(false, true)) { return; } // Internally construct a timeout reply to callback callback.run(createTimeoutReply(MSG)); } / / for getWaitingReplyMessageStatistic @ Override a List < Message > getRequests () {List < Message > requests = new ArrayList<Message>(); requests.add(msg); return requests; }}; Put (msg.getid (), e); // Add the unique UUID of MSG and the envelope envelopes. // Send (MSG, false); }Copy the code

Private method: send

    private void send(Message msg, Boolean noNeedReply) {
        MSG serviceID cannot be empty otherwise
        if (msg.getServiceId() == null) {
            throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
        }
        // Build the base properties for MSG
        basicProperty(msg);
        // Set the MSG header property
        msg.putHeaderEntry(CORRELATION_ID, msg.getId());
        // Set the reply queue for the message
        msg.putHeaderEntry(REPLY_TO, outboundQueue.getBindingKey());
        if (msg instanceof APIMessage) {
            // API always need reply
            msg.putHeaderEntry(NO_NEED_REPLY_MSG, Boolean.FALSE.toString());
        } else if (msg instanceof NeedReplyMessage) {
            // for NeedReplyMessage sent without requiring receiver to reply,
            // mark it, then it will not be tracked and replied
            msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
        }

        buildRequestMessageMetaData(msg);
        wire.send(msg);
    }
Copy the code

This function is a common piece of logic. All messages come in here and are sent out by rabbitMQ. So we need to say a few more words here.

    protected void basicProperty(Message msg) {
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        msg.setAMQPProperties(builder.deliveryMode(1).expiration(String.valueOf(TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.MESSAGE_TTL))).build());
    }
Copy the code

This function sets the MSG base properties — persistence policy (no) and timeout.

Then look at buildRequestMessageMetaData method

private void buildRequestMessageMetaData(Message msg) { if (msg instanceof APIMessage || (msg instanceof NeedReplyMessage && ! Boolean.valueOf((String)msg.getHeaderEntry(NO_NEED_REPLY_MSG)))) { RequestMessageMetaData metaData; if (msg instanceof LockResourceMessage) { LockResourceMessage lmsg = (LockResourceMessage) msg; LockMessageMetaData lmetaData = new LockMessageMetaData(); lmetaData.unlockKey = lmsg.getUnlockKey(); lmetaData.reason = lmsg.getReason(); lmetaData.senderManagementUuid = Platform.getManagementServerId(); metaData = lmetaData; } else { metaData = new RequestMessageMetaData(); } metaData.needApiEvent = msg instanceof APIMessage && ! (msg instanceof APISyncCallMessage); metaData.msgId = msg.getId(); metaData.replyTo = msg.getHeaderEntry(REPLY_TO); metaData.timeout = msg instanceof NeedReplyMessage ? ((NeedReplyMessage) msg).getTimeout() : null; metaData.serviceId = msg.getServiceId(); metaData.messageName = msg.getClass().getName(); metaData.className = metaData.getClass().getName(); msg.getAMQPHeaders().put(MESSAGE_META_DATA, JSONObjectUtil.toJsonString(metaData)); }}Copy the code

Methods buildRequestMessageMetaData will be required for the message metaData took out from the inside of the MSG and into the real Header of MSG.

And then the wire. Send:

        public void send(Message msg) {
            // for unit test finding invocation chain
            MessageCommandRecorder.record(msg.getClass());

            List<BeforeSendMessageInterceptor> interceptors = beforeSendMessageInterceptors.get(msg.getClass());
            if(interceptors ! =null) {
                for (BeforeSendMessageInterceptor interceptor : interceptors) {
                    interceptor.intercept(msg);

                    /* if (logger.isTraceEnabled()) { logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass())); } * /}}for (BeforeSendMessageInterceptor interceptor : beforeSendMessageInterceptorsForAll) {
                interceptor.intercept(msg);

                /* if (logger.isTraceEnabled()) { logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass())); } * /
            }

            send(msg, true);
        }
Copy the code

The logic is clear:

  1. Record its call chain
  2. Call its own pre-send interceptor for interception
  3. Call the pre-send interceptor of all MSGS for interception

send(msg, true); :

       public void send(final Message msg, boolean makeQueueName) {
            /* StopWatch watch = new StopWatch(); watch.start(); * /
            String serviceId = msg.getServiceId();
            if (makeQueueName) { 
                // Get the real queue name
                serviceId = makeMessageQueueName(serviceId);
            }
            // build json schema
            buildSchema(msg);
            // Get the necessary information from the current thread Context. This is how the UUID carried by each API call is passed down
            evalThreadContextToMessage(msg);

            if (logger.isTraceEnabled() && logMessage(msg)) {
                logger.trace(String.format("[msg send]: %s", wire.dumpMessage(msg)));
            }

            // Fetch a channel from the channel poll
            Channel chan = channelPool.acquire();
            try {
                // Explain it separately
                new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();
                /* watch.stop(); logger.debug(String.mediaType("sending %s cost %sms", msg.getClass().getName(), watch.getTime())); * /
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally {
                // Return to channel pollchannelPool.returnChannel(chan); }}Copy the code

Separate analysis of new RecoverableSend (chan, MSG, serviceId, outboundQueue getBusExchange ()). The send (); :

        private class RecoverableSend {
            Channel chan;
            byte[] data;
            String serviceId;
            Message msg;
            BusExchange exchange;

            RecoverableSend(Channel chan, Message msg, String serviceId, BusExchange exchange) throws IOException {
                data = compressMessageIfNeeded(msg);
                this.chan = chan;
                this.serviceId = serviceId;
                this.msg = msg;
                this.exchange = exchange;
            }

            void send(a) throws IOException {
                try {
                    chan.basicPublish(exchange.toString(), serviceId,
                            true, msg.getAMQPProperties(), data);
                } catch (ShutdownSignalException e) {
                    if(! (conninstanceof AutorecoveringConnection) || serverIps.size() <= 1| |! Platform.IS_RUNNING) {// the connection is not recoverable
                        throw e;
                    }

                    logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +
                            "we are doing recoverable send right now", e.getMessage()));

                    if(! recoverSend()) {throwe; }}}private byte[] compressMessageIfNeeded(Message msg) throws IOException {
                if(! CloudBusGlobalProperty.COMPRESS_NON_API_MESSAGE || msginstanceof APIEvent || msg instanceof APIMessage) {
                    return gson.toJson(msg, Message.class).getBytes();
                }

                msg.getAMQPHeaders().put(AMQP_PROPERTY_HEADER__COMPRESSED, "true");
                return Compresser.deflate(gson.toJson(msg, Message.class).getBytes());
            }

            private boolean recoverSend(a) throws IOException {
                int interval = conn.getHeartbeat() / 2;
                interval = interval > 0 ? interval : 1;
                int count = 0;

                // as the connection is lost, there is no need to wait heart beat missing 8 times
                // so we use reflection to fast the process
                RecoveryAwareAMQConnection delegate = FieldUtils.getFieldValue("delegate", conn); DebugUtils.Assert(delegate ! =null."cannot get RecoveryAwareAMQConnection");
                Field _missedHeartbeats = FieldUtils.getField("_missedHeartbeats", RecoveryAwareAMQConnection.class); DebugUtils.Assert(_missedHeartbeats! =null."cannot find _missedHeartbeats");
                _missedHeartbeats.setAccessible(true);
                try {
                    _missedHeartbeats.set(delegate, 100);
                } catch (IllegalAccessException e) {
                    throw new CloudRuntimeException(e);
                }

                while (count < CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES) {
                    try {
                        TimeUnit.SECONDS.sleep(interval);
                    } catch (InterruptedException e1) {
                        logger.warn(e1.getMessage());
                    }

                    try {
                        chan.basicPublish(exchange.toString(), serviceId,
                                true, msg.getAMQPProperties(), data);
                        return true;
                    } catch (ShutdownSignalException e) {
                        logger.warn(String.format("recoverable send fails %s times, will continue to retry %s times; %s", count, CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES-count, e.getMessage())); count ++; }}return false; }}Copy the code

The core code is:

                    chan.basicPublish(exchange.toString(), serviceId,
                            true, msg.getAMQPProperties(), data);
Copy the code

Send messages to RabbitMQ based on the basic properties of the exchange, binder key and MSG and the serialized MSG.

We can look at the method signature:

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;
Copy the code

When the mandatory flag bit is set to true, the basic.return method is called to return the message to the producer if exchange cannot find a queue based on its type and message routeKey. When mandatory is set to false, the broker simply throws the message away.

There is also an immediate method:

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param immediate true if the 'immediate' flag is to be
     * set. Note that the RabbitMQ server does not support this flag.
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
Copy the code

When the immediate flag is set to true, if exchange routes a message to a queue(s) and finds no consumers on the queue, the message will not be queued. When all queues (one or more) associated with the message routeKey have no consumers, the message is returned to the producer using the basic.return method.

CloudBus.reply

    @Override
    public void reply(Message request, MessageReply reply) {
        if (Boolean.valueOf((String) request.getHeaderEntry(NO_NEED_REPLY_MSG))) {
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("%s in message%s is set, drop reply%s", NO_NEED_REPLY_MSG,
                        wire.dumpMessage(request), wire.dumpMessage(reply)));
            }

            return;
        }

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        reply.setAMQPProperties(builder.deliveryMode(1).build());
        reply.getHeaders().put(IS_MESSAGE_REPLY, Boolean.TRUE.toString());
        reply.putHeaderEntry(CORRELATION_ID, request.getId());
        reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));

        buildResponseMessageMetaData(reply);
        if (request instanceof NeedReplyMessage) {
            callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
        }
        wire.send(reply, false);
    }
Copy the code

The other attributes have been mentioned before. reply.setServiceId((String) request.getHeaderEntry(REPLY_TO)); The reply passes through the outboundQueue and is returned to the sender according to the correlationId.

CallReplyPreSendingExtensions will reply the result according to the demand change. Then there’s wire.send, which we’ve analyzed before.

CloudBus.publish

    @Override
    public void publish(Event event) {
        if (event instanceofAPIEvent) { APIEvent aevt = (APIEvent) event; DebugUtils.Assert(aevt.getApiId() ! =null, String.format("apiId of %s cannot be null", aevt.getClass().getName()));
        }
        // Same as msgProperty above
        eventProperty(event);
        / / build the metaData
        buildResponseMessageMetaData(event);
        // This was analyzed earlier
        callReplyPreSendingExtensions(event, null);
        / / call beforeEventPublishInterceptors. To facilitate track when throwing exceptions, we declare such a variable.
        BeforePublishEventInterceptor c = null;
        try {
            List<BeforePublishEventInterceptor> is = beforeEventPublishInterceptors.get(event.getClass());
            if(is ! =null) {
                for(BeforePublishEventInterceptor i : is) { c = i; i.beforePublishEvent(event); }}for(BeforePublishEventInterceptor i : beforeEventPublishInterceptorsForAll) { c = i; i.beforePublishEvent(event); }}catch (StopRoutingException e) {
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("BeforePublishEventInterceptor[%s] stop publishing event: %s",
                        c == null ? "null" : c.getClass().getName(), JSONObjectUtil.toJsonString(event)));
            }

            return;
        }

        wire.publish(event);
    }
Copy the code

Next, look at the wire.publish method

        public void publish(Event evt) {
            /* StopWatch watch = new StopWatch(); watch.start(); * /

            buildSchema(evt);

            evalThreadContextToMessage(evt);

            if (logger.isTraceEnabled() && logMessage(evt)) {
                logger.trace(String.format("[event publish]: %s", wire.dumpMessage(evt)));
            }

            Channel chan = channelPool.acquire();
            try {
                new RecoverableSend(chan, evt, evt.getType().toString(), BusExchange.BROADCAST).send();
                /* watch.stop(); logger.debug(String.mediaType("sending %s cost %sms", evt.getClass().getName(), watch.getTime())); * /
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally{ channelPool.returnChannel(chan); }}Copy the code

Most methods are the same as Send. But there are two types defined in the Event class:

package org.zstack.header.message; import org.zstack.header.rest.APINoSee; public abstract class Event extends Message { /** * @ignore */ @APINoSee private String avoidKey; public String getAvoidKey() { return avoidKey; } public void setAvoidKey(String avoidKey) { this.avoidKey = avoidKey; } public abstract Type getType(); public abstract String getSubCategory(); public static final String BINDING_KEY_PERFIX = "key.event."; public static enum Category { LOCAL, API, } public static class Type { private final String _name; public Type(Category ctg, String subCtg) { _name = BINDING_KEY_PERFIX + ctg.toString() + "." + subCtg; } @Override public String toString() { return _name; } @Override public int hashCode() { return _name.hashCode(); } @Override public boolean equals(Object t) { if (! (t instanceof Type)) { return false; } Type type = (Type) t; return _name.equals(type.toString()); }}}Copy the code

Local and API. It looks pretty good from the name, one for replying to APIMsg and one for Posting local messages. But for details on what’s going on here, check out EventMaid.

EventMaid
private class EventMaid extends AbstractConsumer { Map<String, List<EventListenerWrapper>> listeners = new ConcurrentHashMap<String, List<EventListenerWrapper>>(); Channel eventChan; String queueName = makeEventQueueName(String.format("eventMaid.%s", Platform.getUuid())); public void construct() { try { eventChan = conn.createChannel(); eventChan.queueDeclare(queueName, false, false, true, queueArguments()); eventChan.basicConsume(queueName, true, this); } catch (IOException e) { throw new CloudRuntimeException(e); } } public void destruct() { try { eventChan.close(); } catch (IOException e) { throw new CloudRuntimeException(e); } } public void listen(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { lst = new CopyOnWriteArrayList<EventListenerWrapper>(); listeners.put(type, lst); eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[listening event]: %s", type)); } if (! lst.contains(l)) { lst.add(l); } } } catch (IOException e) { throw new CloudRuntimeException(e); } } public void unlisten(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { return; } lst.remove(l); if (lst.isEmpty()) { listeners.remove(type); eventChan.queueUnbind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[unlistening event]: %s", type)); } } } catch (IOException e) { throw new CloudRuntimeException(e); } } @SyncThread(level = 10) @MessageSafe private void dispatch(Event evt, EventListenerWrapper l) { setThreadLoggingContext(evt); l.callEventListener(evt); } private void handle(Event evt) { String type = evt.getType().toString(); List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { return; } if (logger.isTraceEnabled()) { logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt))); } for (EventListenerWrapper l : lst) { dispatch(evt, l); } } @Override public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Event evt = null; try { evt = (Event) wire.toMessage(bytes, basicProperties); handle(evt); } catch (final Throwable t) { final Event fevt = evt; throwableSafe(new Runnable() { @Override public void run() { if (fevt ! = null) { logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t); } else { logger.warn(String.format("unhandled throwable"), t); }}}); }}}Copy the code

Start with handleDelivery:

        @Override
        public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
            Event evt = null;
            try {
                evt = (Event) wire.toMessage(bytes, basicProperties);
                handle(evt);
            } catch (final Throwable t) {
                final Event fevt = evt;
                throwableSafe(new Runnable() {
                    @Override
                    public void run(a) {
                        if(fevt ! =null) {
                            logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
                        } else {
                            logger.warn(String.format("unhandled throwable"), t); }}}); }}Copy the code

HandleDelivery (Consumer) ¶ handleDelivery (Consumer) ¶

   /**
     * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
Copy the code

This ensures that the EventMaid object can receive the Msg. In the try block, the Event is converted from byte and then goes to handle logic.

        private void handle(Event evt) {
            As mentioned earlier, there are two types, API and Local
            String type = evt.getType().toString();
            // Only two types of List will be retrieved
            List<EventListenerWrapper> lst = listeners.get(type);
            if (lst == null) {
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
            }

            for (EventListenerWrapper l : lst) {
                // Skip to the next logicdispatch(evt, l); }}Copy the code
        @SyncThread(level = 10)
        @MessageSafe
        private void dispatch(Event evt, EventListenerWrapper l) {
            setThreadLoggingContext(evt);
            // Skip to the next logical section
            l.callEventListener(evt);
        }
Copy the code
    @Override
    public EventSubscriberReceipt subscribeEvent(final CloudBusEventListener listener, final Event... events) {
        final EventListenerWrapper wrapper = new EventListenerWrapper() {
            @Override
            public void callEventListener(Event e) {
                // Go to the respective handle logic and unlisten if true is returned
                if (listener.handleEvent(e)) {
                    maid.unlisten(e, this); }}};// An event corresponds to a ListenWrapper
        for (Event e : events) {
            maid.listen(e, wrapper);
        }

        return new EventSubscriberReceipt() {
            @Override
            public void unsubscribe(Event e) {
                maid.unlisten(e, wrapper);
            }

            @Override
            public void unsubscribeAll(a) {
                for(Event e : events) { maid.unlisten(e, wrapper); }}}; }Copy the code

See listen:

        public void listen(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List<EventListenerWrapper> lst = listeners.get(type);
                    if (lst == null) {
                        lst = new CopyOnWriteArrayList<EventListenerWrapper>();
                        listeners.put(type, lst);
                        eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[listening event]: %s", type));
                    }

                    if(! lst.contains(l)) { lst.add(l); }}}catch (IOException e) {
                throw newCloudRuntimeException(e); }}Copy the code

Notice The listeners put and the list of listeners are retrieved according to type. Also convert this list to CopyOnWriteArrayList so that references to this list are not leaked. Then bind a channel as a channel. Also, if the Submitted EventListenerWrapper does not exist in the EventListenerWrapper List, add it.

I believe that so much, some readers may have been confused. Here is a summary of the logical calls to EventMaid:

  • Events are subscribed to CloudBus when each Component of the ZStack starts.
  • When CloudBus receives a publish event, it sends it to all implementationsCloudBusEventListenerThe objects of the interface send events, which they choose to process or not.

CloudBus and EventFascade work together in this way.

summary

In this article, we’ve looked at the source code for the components in ZStack that provide message-driven features — obviously, the apis for both components are nice and clean. But there are several points in the specific logic that can be improved:

  • HandleEvent returns a Boolean that is true and then disallows Listen
  • Listeners in the Listen method can be replaced by ConcurrentHashMap, a concurrent container, to increase throughput.
  • 2. The v of listeners can be replaced by sets. CopyOnWriteArrayList can also be replaced by CopyOnWriteArraySet. We can see in the Listen method that if the LST does not contain L, add. This shows that LST should not be repeated.