sequence

This article focuses on Dubbo’s ExecutionDispatcher

ExecutionDispatcher

Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo/remoting/transport/dispatcher/execution/Exe cutionDispatcher.java

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        returnnew ExecutionChannelHandler(handler, url); }}Copy the code
  • ExecutionDispatcher implements the Dispatcher interface and its Dispatch method returns an ExecutionChannelHandler

ExecutionChannelHandler

Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo/remoting/transport/dispatcher/execution/Exe cutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); }}else{ handler.received(channel, message); }}}Copy the code
  • ExecutionChannelHandler inherits WrappedChannelHandler, whose received method determines whether message is of type Request. If so, it creates ChannelEventRunnable and executes it in the thread pool. If not, handler.received is executed

PerformanceServerTest

Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/test/Java/org/apache/dubbo remoting/PerformanceServerTest. Java

public class PerformanceServerTest  {

    private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class);
    private static ExchangeServer server = null;

    private static void restartServer(int times, int alive, int sleep) throws Exception {
        if(server ! = null && ! server.isClosed()) { server.close(); Thread.sleep(100); }for (int i = 0; i < times; i++) {
            logger.info("restart times:" + i);
            server = statServer();
            if (alive > 0) Thread.sleep(alive);
            server.close();
            if (sleep > 0) Thread.sleep(sleep);
        }

        server = statServer();
    }

    private static ExchangeServer statServer() throws Exception {
        final int port = PerformanceUtils.getIntProperty("port", 9911);
        final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER);
        final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
        final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY, DEFAULT_THREADPOOL);
        final int threads = PerformanceUtils.getIntProperty(THREADS_KEY, DEFAULT_THREADS);
        final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS);
        final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
        final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME);


        // Start server
        ExchangeServer server = Exchangers.bind("Exchange: / / 0.0.0.0." + port + "? transporter="
                + transporter + "&serialization="
                + serialization + "&threadpool=" + threadpool
                + "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler, new ExchangeHandlerAdapter() {
            public String telnet(Channel channel, String message) throws RemotingException {
                return "echo: " + message + "\r\ntelnet> ";
            }

            public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
                if ("environment".equals(request)) {
                    return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment());
                }
                if ("scene".equals(request)) {
                    List<String> scene = new ArrayList<String>();
                    scene.add("Transporter: " + transporter);
                    scene.add("Service Threads: " + threads);
                    return CompletableFuture.completedFuture(scene);
                }
                returnCompletableFuture.completedFuture(request); }});return server;
    }

    private static ExchangeServer statTelnetServer(int port) throws Exception {
        // Start server
        ExchangeServer telnetserver = Exchangers.bind("Exchange: / / 0.0.0.0." + port, new ExchangeHandlerAdapter() {
            public String telnet(Channel channel, String message) throws RemotingException {
                if (message.equals("help")) {
                    return "support cmd: \r\n\tstart \r\n\tstop \r\n\tshutdown \r\n\trestart times [alive] [sleep] \r\ntelnet>";
                } else if (message.equals("stop")) {
                    logger.info("server closed:" + server);
                    server.close();
                    return "stop server\r\ntelnet>";
                } else if (message.startsWith("start")) {
                    try {
                        restartServer(0, 0, 0);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return "start server\r\ntelnet>";
                } else if (message.startsWith("shutdown")) {
                    System.exit(0);
                    return "start server\r\ntelnet>";
                } else if (message.startsWith("channels")) {
                    return "server.getExchangeChannels():" + server.getExchangeChannels().size() + "\r\ntelnet>";
                } else if (message.startsWith("restart ")) { //r times [sleep] r 10 or r 10 100
                    String[] args = message.split("");
                    int times = Integer.parseInt(args[1]);
                    int alive = args.length > 2 ? Integer.parseInt(args[2]) : 0;
                    int sleep = args.length > 3 ? Integer.parseInt(args[3]) : 100;
                    try {
                        restartServer(times, alive, sleep);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                    return "restart server,times:" + times + " stop alive time: " + alive + ",sleep time: " + sleep + " usage:r times [alive] [sleep] \r\ntelnet>";
                } else {
                    return "echo: " + message + "\r\ntelnet> "; }}});return telnetserver;
    }

    @Test
    public void testServer() throws Exception {
        // Read port from property
        if (PerformanceUtils.getProperty("port", null) == null) {
            logger.warn("Please set -Dport=9911");
            return;
        }
        final int port = PerformanceUtils.getIntProperty("port", 9911);
        final boolean telnet = PerformanceUtils.getBooleanProperty("telnet".true);
        if (telnet) statTelnetServer(port + 1);
        server = statServer();

        synchronized (PerformanceServerTest.class) {
            while (true) {
                try {
                    PerformanceServerTest.class.wait();
                } catch (InterruptedException e) {
                }
            }
        }
    }

}
Copy the code
  • PerformanceServerTest statServer method using PerformanceUtils. GetProperty (the DISPATCHER_KEY, Executiondispatcher. NAME), or executionDispatcher. NAME if not found

summary

ExecutionChannelHandler inherits WrappedChannelHandler, whose received method determines whether message is of type Request. If so, it creates ChannelEventRunnable and executes it in the thread pool. If not, handler.received is executed

doc

  • ExecutionDispatcher