First, service exposure

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); / / first try to from the cache access ExporterChangeableWrapper < T > exporter = (ExporterChangeableWrapper < T >) bounds. Get (key);if (exporter == null) {
		synchronized (bounds) {
			exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
			if(my friend == null) {// fromexport// Create Invoker as a delegate object final Invoker<? > invokerDelegete = new InvokerDelegete<T>( originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // Write cache bounds.put(key, exporter); }}}return exporter;
}
Copy the code

In the code above, it tries to fetch it from the cache, and if it doesn’t, it calls protocol.export to expose it.

The protocol object here is actually an Adaptive extension class object protocol $Adaptive. When we call its export method, it will obtain the corresponding extension implementation class according to the protocol name. In this case, it is DubboProtocol.

I don’t know if you remember, but we talked about it in chapter 2. Through ExtensionLoader. GetExtensionLoader (Protocol. The class). GetExtension (extName); This code actually gets an object of the Wrapper class, ProtocolListenerWrapper

1. Service exposure listening

ProtocolListenerWrapper. The method of export is mainly for service exposure listener, when service exposure and cancel service exposure can be notified.

public class ProtocolListenerWrapper implements Protocol {

	public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
		if ("registry".equals(invoker.getUrl().getProtocol())) {
			returnprotocol.export(invoker); } // Get ExporterListener type extension point loader ExtensionLoader<ExporterListener> ExtensionLoader = ExtensionLoader.getExtensionLoader(ExporterListener.class); List<ExporterListener> activateExtension = extensionLoader.getActivateExtension (invoker.geturl ()),"exporter.listener"); / / call ProtocolFilterWrapper. Export to expose Exporter < T >export= protocol.export(invoker); List<ExporterListener> exporterListeners = Collections.unmodifiableList(activateExtension); // Loop listener notification method. Return ListenerExporterWrapper<T> ListenerExporterWrapper = new ListenerExporterWrapper<>(export, exporterListeners);
        returnlistenerExporterWrapper; }}Copy the code

For example, we can create a custom listener.

public class MyExporterListener1 implements ExporterListener { public void exported(Exporter<? > exporter) throws RpcException { System.out.println("111111111111111------- Service exposure"); } public void unexported(Exporter<? > exporter) { System.out.println("1111111111111 ------- Cancel service exposure"); }}Copy the code

Then create extension point configuration file, file name called: org. Apache. Dubbo. RPC. ExporterListener content as follows: listener1=org.apache.dubbo.demo.provider.MyExporterListener1

Then in the Dubbo configuration file, define: < Dubbo :provider Listener =”listener1″ />

Then, you will be notified when the service exposure is complete.

2. Build the call chain

Step on ProtocolListenerWrapper. Export method, also called ProtocolFilterWrapper before return. Export. It is primarily intended to create a chain of calls containing various filters.

public class ProtocolFilterWrapper implements Protocol {	
	public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            returnprotocol.export(invoker); <T> tInvoker = buildInvokerChain(buildInvoker,"service.filter"."provider"); // Call DubboProtocol to continue exposing my Exporter<T>export= protocol.export(tInvoker); / / returnreturn export; }}Copy the code

The focus here is on the buildInvokerChain method, which creates the call chain interceptor. This interception is executed each time the remote method is executed, as the Filter known in Dubbo has

org.apache.dubbo.rpc.filter.EchoFilter
org.apache.dubbo.rpc.filter.GenericFilter
org.apache.dubbo.rpc.filter.GenericImplFilter
org.apache.dubbo.rpc.filter.TokenFilter
org.apache.dubbo.rpc.filter.AccessLogFilter
org.apache.dubbo.rpc.filter.CountFilter
org.apache.dubbo.rpc.filter.ActiveLimitFilter
org.apache.dubbo.rpc.filter.ClassLoaderFilter
org.apache.dubbo.rpc.filter.ContextFilter
org.apache.dubbo.rpc.filter.ConsumerContextFilter
org.apache.dubbo.rpc.filter.ExceptionFilter
org.apache.dubbo.rpc.filter.ExecuteLimitFilter
org.apache.dubbo.rpc.filter.DeprecatedFilter
Copy the code

At this point, the Invoker, after a variety of Filter packaging, becomes the following appearance:

Of course, we can also customize the Filter. For example:

public class MyFilter1 implements Filter { public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { System.out.println("Before calling:"+invoker.getUrl().toFullString());
        Result result = invoker.invoke(invocation);
        System.out.println("After calling:"+invoker.getUrl().toFullString());
        returnresult; }}Copy the code

Then create extension point configuration file, file name called: resources \ meta-inf \ dubbo \ com. Alibaba dubbo. RPC. Filter the content as follows: myfilter1=org.apache.dubbo.demo.provider.MyFilter1

Then in the Dubbo configuration file, define it as follows: < Dubbo: Provider filter=” myFilter1 “/>

Note that after this configuration, myFilter1 comes after the default Filter.

3, DubboProtocol

After all the above to do to do, finally can really expose the service. Calling dubboProtocol.export, we focus on two parts: creating a DubboExporter and starting the server.

public class DubboProtocol extends AbstractProtocol {
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); / / service identification / /, for example: com.viewscenes.net supervisor. Service. InfoUserService: 20880 String key = the serviceKey (url); DubboExporter<T> exporter = new DubboExporter<T>(url, url, export map); Export map. Put (key, exporter); // Export map. // omit irrelevant code... // Start communication server openServer(url); // optimizeSerialization optimizeSerialization(url);returnexporter; }}Copy the code

3.1. Create DubboExporter

In fact, creating a DubboExporter is as simple as calling the constructor to assign a value.

public class DubboExporter<T> extends AbstractExporter<T> {
	public DubboExporter(Invoker<T> invoker, String key, 
				Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
    }
}
Copy the code

3.2. Start the server

Private void openServer(URL URL) {// Obtain IP: port and use it as the key of the server instance. String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
	if(isServer) {// Get ExchangeServer server = servermap.get (key);if(server == null) {// Create a server instance serverMap.put(key, createServer(URL)); }else{// Reset the server server.reset(url); }}}Copy the code

As shown in the code above, Dubbo grabs the started server instance from the cache and creates it if it misses. If a server instance already exists, the server is reset based on the contents of the URL. We focus on the creation process.

Private ExchangeServer createServer(URL URL) {// Sent when the server is shut downreadonlyEvent URL = url.addParameterIfAbsent("channel.readonly.sent"."true"); // Set heartbeat Detection URL = url.addParameterIfAbsent("heartbeat"."60000"); The default value is netty String STR = url.getparameter ("server"."netty"); // SPI checks whether the Transporter extension represented by the server parameter exists, and throws an exception if it does notif(str ! = null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)){ throw new RpcException("Unsupported server type: " + str + ", url: "+ url); } // Set the server codec to dubbo url = url.addparameter ("codec"."dubbo"); ExchangeServer server; Try {// Create ExchangeServer server = exchangers.bind (url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ")" + e.getMessage(), e);
	}
	str = url.getParameter(Constants.CLIENT_KEY);
	if(str ! = null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader. getExtensionLoader(Transporter.class).getSupportedExtensions();if(! supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: "+ str); }}return server;
}
Copy the code

The above code is divided into two main parts: setting the default parameters and creating the server instance. There is no need to set the parameters, and the headersanjie. bind method is called, which only sets the wrapper Handler.

public class HeaderExchanger implements Exchanger {

    public ExchangeServer bind(URL url, ExchangeHandler Handler) throws RemotingException {// Encapsulates the handler handler HeaderExchangeHandler HeaderExchangeHandler = new HeaderExchangeHandler(handler); DecodeHandler decodeHandler = new DecodeHandler(headerExchangeHandler); // Create a Server Serverbind= Transporters.bind(url, decodeHandler); Return HeaderExchangeServer server = new HeaderExchangeServer(bind);
        returnserver; }}Copy the code

We just need to focus on transporters.bind, which is responsible for starting the server.

public class Transporters {
	public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else{ handler = new ChannelHandlerDispatcher(handlers); } Transporter adaptiveExtension = ExtensionLoader. getExtensionLoader(Transporter.class).getAdaptiveExtension(); / / call NettyServer. BindreturnadaptiveExtension.bind(url, handler); }}Copy the code

In the code above, it first gets an adaptive Transporter instance, namely TransporterAdaptive. Which Transporter is then loaded based on the URL argument passed in, which in Dubbo defaults to NettyTransporter. It is important to note that depending on the Dubbo version, there may be different versions using Netty.

For example, the Netty configuration file I saw in the Dubbo2.7 snapshot release (not yet released) looks like this, indicating that it uses Netty4 by default:

netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter
netty= org.apache.dubbo.remoting.transport.netty4.NettyTransporter
Copy the code

In dubo2.6, the Netty configuration file looks like this, indicating that if you do not specify Netty4, then use Netty3

netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
Copy the code

But it doesn’t matter. Let’s continue with Netty3 at….

public class NettyTransporter implements Transporter {
    public Server bind(URL URL, ChannelHandler Listener){// Create a NettyServerreturn new NettyServer(url, listener);
    }
}
public class NettyServer extends AbstractServer implements Server {
	public NettyServer(URL url, ChannelHandler handler) {
        super(url, ChannelHandlers.wrap(handler, 
			ExecutorUtil.setThreadName(url, "DubboServerHandler"))); }}Copy the code

We see that in the nettyTransporter. bind method, it calls the NettyServer constructor, followed by the parent constructor.

public abstract class AbstractServer extends AbstractEndpoint implements Server {

	public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress(); // Get IP address and port StringbindIp = getUrl().getParameter("bind.ip", getUrl().getHost());
        int bindPort = getUrl().getParameter("bind.port", getUrl().getPort());
        if (url.getParameter("anyhost".false) || NetUtils.isInvalidLocalHost(bindIp)) {// Set Ip to 0.0.0.0bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter("accepts", 0);
        this.idleTimeout = url.getParameter("idle.timeout", 600000); Try {// call the subclass method to start the serverdoOpen(); }}}Copy the code

In the code above, the parent constructor mainly sets some parameters, needless to say. Then we’ll look at the subclass’s doOpen implementation.

protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); / / create the boss and the name of the worker thread pool threads / / set the ExecutorService boss = Executors. NewCachedThreadPool (new NamedThreadFactory ("NettyServerBoss".true));
	ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker".true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory( boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); // Create ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); / / set PipelineFactory bootstrap. SetPipelineFactory (newChannelPipelineFactory() {
		@Override
		public ChannelPipeline getPipeline() {
			NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
			ChannelPipeline pipeline = Channels.pipeline();
			/*int idleTimeout = getIdleTimeout();
			if (idleTimeout > 10000) {
				pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
			}*/
			pipeline.addLast("decoder", adapter.getDecoder());
			pipeline.addLast("encoder", adapter.getEncoder());
			pipeline.addLast("handler", nettyHandler);
			returnpipeline; }}); // Bind to the specified IP address and port channel = bootstrap.bind(getBindAddress()); }Copy the code

The above method starts the communication server through Netty. Those familiar with Netty will be familiar with this code, but if you want to learn more about it, you need to take a look at its processor.

The processor

ChannelHandler is one of the core components in Netty. Here, Dubbo uses NettyHandler as the message handler. It inherits from SimpleChannelHandler, which means that events received by Netty are handled by this class. For example, client connection, client disconnection, data reading, network exception… Let’s focus on the data reading method.

@Sharable public class NettyHandler extends SimpleChannelHandler { public NettyHandler(URL url, ChannelHandler handler) { this.url = url; this.handler = handler; } public void messageReceived(ChannelHandlerContext CTX, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); }}}Copy the code

When Netty’s Selector polls for a data read event, the messageReceived method is called. In this case, it calls Handler.received, where, as the constructor tells you, the handler object is actually an instance of the NettyServer object.

It passes through the AllChannelHandler, where it allocates a thread in the thread pool to handle it.

public class AllChannelHandler extends WrappedChannelHandler { public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); }}Copy the code

ChannelEventRunnable implements the Runnable interface. Let’s look at its run method. In fact, it is very simple, is based on the event state, continue to call down.

public class ChannelEventRunnable implements Runnable {
    public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is "+ message); }}}Copy the code

I don’t think I need to delve further into the process, but business logic processing. But there’s another question, what does this thread pool look like? What’s the size? By tracing, we find that it is initialized in its parent class. It is also loaded through ExtensionLoader

public class WrappedChannelHandler implements ChannelHandlerDelegate { protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; ExtensionLoader<ThreadPool> extensionLoader = ExtensionLoader.getExtensionLoader(ThreadPool.class); ThreadPool adaptiveExtension = extensionLoader.getAdaptiveExtension(); executor = (ExecutorService) adaptiveExtension.getExecutor(url); }}Copy the code

Then we see that the ThreadPool interface annotates the default implementation @SPI(“fixed”), which is a fixed number of thread pools.

Public class FixedThreadPool implements ThreadPool {public Executor getExecutor(URL URL) {// set the ThreadPool parameter String name = url.getParameter("threadname"."Dubbo");
        int threads = url.getParameter("threads", 200);
        int queues = url.getParameter("queues", 0);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}Copy the code

To answer the above question, the thread pool in Dubbo is a fixed thread pool of 200 threads. What if the thread pool is full? Let’s look at its rejection strategy.

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	String msg = String.format("Thread pool is EXHAUSTED!" +
					" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
					" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
			threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
			e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
			url.getProtocol(), url.getIp(), url.getPort());
	logger.warn(msg);
	dumpJStack();
	throw new RejectedExecutionException(msg);
}
Copy the code

Did you learn?

  • Print error message
  • Export thread stack information
  • An exception is thrown

At this point, the process of service exposure is analyzed. The whole process is quite complicated, so please be patient during the analysis. And write more Demo breakpoint debugging, in order to better understand the code logic.

Service registration

Service registration is to register exposed service information on a third party platform for consumer use. Back to the registryProtocol.export method, let’s take the ZooKeeper registry as an example.

1. Create a registry

First of all, need according to the configuration file access to information to the registry url, such as a zookeeper, for example: they are: / / 192.168.139.131:2181 / com. Alibaba. Dubbo. Registry. RegistryService? Application = dubbo_producer1 & client = zkclient&dubbo = 2.6.2…

We go straight to ZookeeperRegistry, where the focus is on creating the Zookeeper client by calling the Connect method.

Public ZookeeperRegistry(URL URL, ZookeeperTransporter ZookeeperTransporter) {// Skip some code... / / create a zookeeper client zkClient = zookeeperTransporter. Connect (url); zkClient.addStateListener(newStateListener() {
		public void stateChanged(int state) {
			if(state == RECONNECTED) {try {// Recover (); } catch (Exception e) { logger.error(e.getMessage(), e); }}}}); }Copy the code

One thing to note here is that the Dubbo website says zkClient is used by default to connect to ZooKeeper.

Zkclient is implemented by default from version 2.2.0 to improve the health of the ZooKeeper client.

But by code, it defaults to using a curator client. @ SPI (” curator “) this is confusing, so if you want to use zkclient, should be specified in the configuration file: < dubbo: registry address = “zookeeper: / / 192.168.139.131:2181? client=zkclient”/>

Then we continue to see, finally call the zkClient method to complete the creation of the ZooKeeper client.

Public ZkclientZookeeperClient(URL URL) {zkClientClient = new ZkClientWrapper(url.getBackupAddress(), 30000); // Listen to the ZooKeeper status client.addListener(new)IZkStateListener() { @Override public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state =  state;if (state == KeeperState.Disconnected) {
				stateChanged(StateListener.DISCONNECTED);
			} else if(state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } @Override public void handleNewSession() throws Exception { stateChanged(StateListener.RECONNECTED); }}); client.start(); }Copy the code

2. Create a node

Creating a node is as simple as writing the service configuration data to a node in a path of Zookeeper.

protected void doRegister(URL url) {
	try {
		zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
	} catch (Throwable e) {
		throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

Zookeeper node information:

Third, summary

At this point, we have analyzed the entire process of service exposure in Dubbo. Because of the length problem, the author divides them into two parts. The number of words is more complex, logic is also more complex, if the article is wrong, I hope you put forward valuable opinions.

Let’s recall the process again:

  • Call the initialization method through the Spring interface
  • Check configuration information and set default values
  • Create a service class ref Invoker
  • The service exposes the listener and builds the invocation chain
  • Local exposure
  • Remote exposed
  • Start the Netty communication server and listen for the port
  • Connect to ZooKeeper to create a node to write exposed service information to the registry