sequence
In this paper, we study the dubbo ConnectionOrderedDispatcher
ConnectionOrderedDispatcher
Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo/remoting/transport/dispatcher/connection/Co nnectionOrderedDispatcher.java
public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
returnnew ConnectionOrderedChannelHandler(handler, url); }}Copy the code
- ConnectionOrderedDispatcher Dispatcher interface is achieved, the dispatch is ConnectionOrderedChannelHandler method returns
ConnectionOrderedChannelHandler
Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo/remoting/transport/dispatcher/connection/Co nnectionOrderedChannelHandler.java
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor! queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout. if (message instanceof Request && t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); }}}Copy the code
- ConnectionOrderedChannelHandler inherited WrappedChannelHandler, the constructor to create a corePoolSize and maximumPoolSize 1, Queue is the connectionExecutor of LinkedBlockingQueue
- The connected and disconnected methods both use connectionExecutor to execute the newly created ChannelEventRunnable; Both methods execute checkQueueLength to determine whether the queue size is greater than queuewarningLimit, and print a WARN log if it is
- Both received and Caught obtain the thread pool through getExecutorService of the parent class and execute the ChannelEventRunnable. Received method RejectedExecutionException when the capture the abnormal and the message is the Request, and when the Request is a twoWay returns SERVER_THREADPOOL_EXHAUSTED_ERROR
ConnectChannelHandlerTest
Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/test/Java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest. java
public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest {
@BeforeEach
public void setUp() throws Exception {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);
}
@Test
public void test_Connect_Blocked() throws RemotingException {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);
Assertions.assertEquals(1, executor.getMaximumPoolSize());
int runs = 20;
int taskCount = runs * 2;
for (int i = 0; i < runs; i++) {
handler.connected(new MockedChannel());
handler.disconnected(new MockedChannel());
Assertions.assertTrue(executor.getActiveCount() <= 1, executor.getActiveCount() + " must <=1");
}
//queue.size
Assertions.assertEquals(taskCount - 1, executor.getQueue().size());
for (int i = 0; i < taskCount; i++) {
if (executor.getCompletedTaskCount() < taskCount) {
sleep(100);
}
}
Assertions.assertEquals(taskCount, executor.getCompletedTaskCount());
}
@Test //biz error should not throw and affect biz thread.
public void test_Connect_Biz_Error() throws RemotingException {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);
handler.connected(new MockedChannel());
}
@Test //biz error should not throw and affect biz thread.
public void test_Disconnect_Biz_Error() throws RemotingException {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);
handler.disconnected(new MockedChannel());
}
@Test
public void test_Connect_Execute_Error() throws RemotingException {
Assertions.assertThrows(ExecutionException.class, () -> {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);
executor.shutdown();
handler.connected(new MockedChannel());
});
}
@Test
public void test_Disconnect_Execute_Error() throws RemotingException {
Assertions.assertThrows(ExecutionException.class, () -> {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);
executor.shutdown();
handler.disconnected(new MockedChannel());
});
}
//throw ChannelEventRunnable.runtimeExeception(int logger) not in execute exception
@Test//(expected = RemotingException.class)
public void test_MessageReceived_Biz_Error() throws RemotingException {
handler.received(new MockedChannel(), "");
}
//throw ChannelEventRunnable.runtimeExeception(int logger) not in execute exception
@Test
public void test_Caught_Biz_Error() throws RemotingException {
handler.caught(new MockedChannel(), new BizException());
}
@Test
public void test_Received_InvokeInExecuter() throws RemotingException {
Assertions.assertThrows(ExecutionException.class, () -> {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1);
executor.shutdown();
executor = (ThreadPoolExecutor) getField(handler, "executor", 1);
executor.shutdown();
handler.received(new MockedChannel(), "");
});
}
/**
* Events do not pass through the thread pool and execute directly on the IO
*/
@SuppressWarnings("deprecation")
@Disabled("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.")
@Test
public void test_Received_Event_invoke_direct() throws RemotingException {
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1);
executor.shutdown();
executor = (ThreadPoolExecutor) getField(handler, "executor", 1);
executor.shutdown();
Request req = new Request();
req.setHeartbeat(true);
final AtomicInteger count = new AtomicInteger(0);
handler.received(new MockedChannel() {
@Override
public void send(Object message) throws RemotingException {
Assertions.assertTrue(((Response) message).isHeartbeat(), "response.heartbeat");
count.incrementAndGet();
}
}, req);
Assertions.assertEquals(1, count.get(), "channel.send must be invoke"); }}Copy the code
- Is ConnectionOrderedChannelHandler ConnectChannelHandlerTest created during the setup, Then test_Connect_Blocked, test_Connect_Biz_Error, test_Disconnect_Biz_Error, test_Connect_Execute_Error, and test_Disconnect_Ex were executed Ecute_Error, test_MessageReceived_Biz_Error, test_Caught_Biz_Error, test_Received_InvokeInExecuter, test_Received_Event_invo ke_direct
summary
- ConnectionOrderedDispatcher implements the Dispatcher interface, its dispatch method returns ConnectionOrderedChannelHandler; ConnectionOrderedChannelHandler inherited WrappedChannelHandler, the constructor to create a corePoolSize and maximumPoolSize 1, Queue is the connectionExecutor of LinkedBlockingQueue
- ConnectionOrderedChannelHandler connectionExecutor connected, disconnected methods are use to perform the newly created ChannelEventRunnable; Both methods execute checkQueueLength to determine whether the queue size is greater than queuewarningLimit, and print a WARN log if it is
- ConnectionOrderedChannelHandler received, caught by the superclass getExecutorService thread pool, then execute the create ChannelEventRunnable; Received method RejectedExecutionException when the capture the abnormal and the message is the Request, and when the Request is a twoWay returns SERVER_THREADPOOL_EXHAUSTED_ERROR
doc
- ConnectionOrderedDispatcher