Want to learn Dubbo source code? Start with SPI

How did the Dubbo service get exposed? See the source code to know ②

Dubbo service reference source code analysis ③

Through the analysis of the previous three chapters, we have learned the basis of Dubbo: Dubbo SPI, the service exposure of Provider and the service reference of Consumer. Finally, we need to learn the complete service invocation process. The Dubbo service invocation process is complex, including steps such as sending the request, codec, service degradation, filter, serialization, thread dispatch, and response to the request. But understanding the general logic of the process, and then focusing on the main classes, is actually very easy to understand.

Before analyzing, put a diagram of the call process from the official website:

First, the consumer initiates a request through the proxy object, and sends the encoded request to the Provider’s Server through the network communication client. The Server receives the packet and decodes it. Decoded requests are sent to the Dispatcher Dispatcher, which distributes them to the specified thread pool, which performs the specific service. And then there’s the process of sending back the response that this diagram doesn’t show. It’s a good idea to open your IDE and track the source code together to get a better look before you start analyzing it in earnest.

0. Service invocation

As you can see from the figure above, the call comes from the Proxy object Proxy. The proxy class is dynamically generated, directly manipulated bytecode, which we need to decompile to see what it looks like. Dubbo used Javassist, and we decomcompiled Arthas, which is also an open source diagnostic tool from Alibaba. First of all to its official website to download software package: arthas.aliyun.com/doc/downloa…

After decompressing the software package, go to the root directory and run the following command to start the software:

java -jar arthas-boot.jar
Copy the code

When started, the terminal displays a list of Java processes, such as this :(note that you need to start the consumer and keep running).

Then enter the Consumer number, such as 4. Arthas is associated with this process. Since the Demo has only one service interface, it generates only one proxy class. Let’s search for the proxy class based on the suffix:

sc *.proxy0
Copy the code

Remember this path, and finally decomcompile using jad:

jad com.alibaba.dubbo.common.bytecode.proxy0
Copy the code

Once compiled, the corresponding proxy class should be displayed on the terminal:

public class proxy0 implements ClassGenerator.DC.ServiceAPI.EchoService {
    // Array of methods
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }

    public proxy0(a) {}public String sendMessage(String string) {
        // Store the parameters in the Object array
        Object[] arrobject = new Object[]{string};
        Call the invoke method of InvocationHandler
        Object object = this.handler.invoke(this, methods[0], arrobject);
        return (String)object;
    }
    // Test method
    @Override
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        returnobject2; }}Copy the code

The entire proxy class is relatively simple, mainly calling the Invoke method of InvocationHandler. We found its implementation class. In Dubbo, its implementation class is InvokerInvocationHandler:

public class InvokerInvocationHandler implements InvocationHandler {

    private finalInvoker<? > invoker;public InvokerInvocationHandler(Invoker
        handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(newRpcInvocation(method, args)).recreate(); }}Copy the code

Through debugging, we discovered that the Invoker variable is of type MockClusterInvoker, which is the invoke method that will call the class at the end. MockClusterInvoker#invoke calls the AbstractClusterInvoker#invoke method, which then performs some service degradation logic. Following a series of calls, let’s jump right to the key method: DubboInvoker#doInvoke

protected Result doInvoke(final Invocation invocation) throws Throwable {
    // It records the calling method, interface, parameters, etc
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // Set path and version to attachments of inv
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    // Get the communication client
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // Get the asynchronous configuration
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // When isOneway is true, it indicates one-way communication
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        Async has no return value
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // Send the request
            currentClient.send(inv, isSent);
            // Set the context futrue to null
            RpcContext.getContext().setFuture(null);
            // Returns an empty result
            return new RpcResult();
        Async has a return value
        } else if (isAsync) {
            // Send the request and get a future
            ResponseFuture future = currentClient.request(inv, timeout);
            // Set the future to the context
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            // Temporarily return an empty result
            return new RpcResult();
        // Synchronous call
        } else {
            RpcContext.getContext().setFuture(null);
            // There is a future, but the get method is called, and it waits forever, which is equivalent to synchronization
            return(Result) currentClient.request(inv, timeout).get(); }}catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

The above method is very clear on Dubbo asynchronous, synchronous calls. The key difference is who calls the GET method, and in asynchronous mode, the user calls it. The async return type in Dubbo is ResponseFuture, and its default implementation class is DefaultFuture. Let’s look at a few key methods:

// Attribute...
public DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    // Get the request ID, which is very important. Since it is an asynchronous request, the response information is matched by this
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // Store the request ID and future in the Map
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}
public Object get(a) throws RemotingException {
    return get(timeout);
}
public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    // Check whether the Provider returns the result of the call
    if(! isDone()) {long start = System.currentTimeMillis();
        lock.lock();
        try {
            // Loop check
            while(! isDone()) {// If the result has not been returned, wait while to avoid wasting resources
                done.await(timeout, TimeUnit.MILLISECONDS);
                // If a result or timeout is returned, jump out of while
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break; }}}catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        // If you jump out of while with no result, throw an exception
        if(! isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); }}// returns the result of the call
    return returnFromResponse();
}
public boolean isDone(a) {
    returnresponse ! =null;
}
private Object returnFromResponse(a) throws RemotingException {
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // If the response status is OK, the call process is normal
    if (res.getStatus() == Response.OK) {
        return res.getResult();
    }
    // Timeout is throwing an exception
    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    }
    throw new RemotingException(channel, res.getErrorMessage());
}
// Other methods slightly...
Copy the code

In the above methods, the constructor assigns several important attributes, and the GET method blocks if it does not receive the result. At this point, the parsing of the proxy class’s request as it is sent out is complete, and the analysis of how the request data is sent and received, as well as the sending and receiving of the response data is proceeded.

1. Send the request

Following the DubboInvoker above, let’s take a closer look at how it makes the request, currentClient.Request. Through debugging we find its implementation class, is ReferenceCountExchangeClient:

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger refenceCount = new AtomicInteger(0);
    // Other attributes omitted...
    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        // The reference count increases
        refenceCount.incrementAndGet();
        this.url = client.getUrl();
        //...
    }
    public ResponseFuture request(Object request) throws RemotingException {
        / / call HeaderExchangeClient# request
        return client.request(request);
    }
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // Request with timeout
        return client.request(request, timeout);
    }
    public void close(int timeout) {
        // The reference count is decrement
        if (refenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
                client.close();
            } else{ client.close(timeout); } client = replaceWithLazyClient(); }}public void incrementAndGetCount(a) {
        // The reference count is incremented. This method is called externally
        refenceCount.incrementAndGet();
    }
    // Other methods slightly...
}
Copy the code

RefenceCount is an internally defined reference count variable. RefenceCount increases each time the object is referenced and decreases each time it is closed. The other omitted methods are simple utility methods, and we look at HeaderExchangeClient, the class in which the method of the same name is called by request.

public class HeaderExchangeClient implements ExchangeClient {
    private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);
    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2.new NamedThreadFactory("dubbo-remoting-client-heartbeat".true));
    private final Client client;
    private final ExchangeChannel channel;
    // heartbeat timer
    privateScheduledFuture<? > heartbeatTimer;private int heartbeat;
    // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // Create a HeaderExchangeChannel object
        this.channel = new HeaderExchangeChannel(client);
        // Heartbeat detection
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo ! =null && dubbo.startsWith("1.0.")? Constants.DEFAULT_HEARTBEAT :0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // Start the heartbeat detection timerstartHeatbeatTimer(); }}public ResponseFuture request(Object request) throws RemotingException {
        / / call HeaderExchangeChannel# request
        return channel.request(request);
    }
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // Request with timeout
        return channel.request(request, timeout);
    }
    public void close(a) {
        doClose();
        channel.close();
    }
    public void close(int timeout) {
        // Mark the client into the closure process
        startClose();
        doClose();
        channel.close(timeout);
    }
    // Start heartbeat detection timer
    private void startHeatbeatTimer(a) {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels(a) {
                            return Collections.<Channel>singletonList(HeaderExchangeClient.this); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); }}// Turn off the heartbeat detection timer
    private void stopHeartbeatTimer(a) {
        if(heartbeatTimer ! =null && !heartbeatTimer.isCancelled()) {
            try {
                heartbeatTimer.cancel(true);
                scheduled.purge();
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        heartbeatTimer = null;
    }
    // Turn off the heartbeat detection timer
    private void doClose(a) {
        stopHeartbeatTimer();
    }
    // Other methods slightly...
}
Copy the code

Many of the methods left out above just call the HeaderExchangeChannel method of the same name and are relatively simple to use, such as setting properties, getting addresses, heartbeat detection, etc. These are not the focus of attention. Let’s look at the request methods:

final class HeaderExchangeChannel implements ExchangeChannel {
    private final Channel channel;
    private volatile boolean closed = false;
    // Other attributes omitted...
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        // This channel points to the Netty client and is called when the Netty client is created
        this.channel = channel;
    }
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null."Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // Create a request object that contains the name of the called method, parameter type, invoker, etc., which we analyzed earlier
        Request req = new Request();
        req.setVersion("2.0.0");
        // Two-way communication
        req.setTwoWay(true);
        // This request type is RpcInvocation
        req.setData(request);
        // Create fuTrue, which is the receiving object for asynchronous requests
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // Netty's send is eventually called
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throwe; } returns the futruereturnfuture; }}Copy the code

In the above method, we finally know where the request was created. If you are interested in the structure of the Request, you can take a look at it for yourself. It is simple, just a few properties plus some tools and methods. Focus on where the final send method is. Debugging shows that it takes a few more calls to actually reach Netty, as shown below:

The two abstract classes in front of NettyChannel are just abstractions from the communication client, because Dubbo supports more than one communication framework, Netty, so it is not possible to jump directly from HeaderExchangeChannel to Netty. For example, One of AbstractClient’s implementation classes is NettyClient, which then calls NettyChannel. Let’s look directly at the NettyChannel#send method:

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);
    boolean success = true;
    int timeout = 0;
    try {
        // Send a message
        ChannelFuture future = channel.write(message);
        
      
        //true: waiting for the message to be sent. If the message fails to be sent, an exception is thrown
       //false indicates that the message is placed in the IO pair and returned immediately without waiting for the message to be sent
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // Wait for a message to be sent. If timeout occurs, success is set to false
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if(cause ! =null) {
            throwcause; }}catch (Throwable e) {
        throw new RemotingException(this."Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    // Raise an exception if success is false
    if(! success) {throw new RemotingException(this."Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit"); }}Copy the code

Here, at last, the news really came out. The channel in the above method is a true Netty channel, not a Dubbo wrapper. Of course, there is an encoding step before the message is sent, and we can find the corresponding codec through the initialization of NettyServer. We came to NettyServer class, familiar with Netty friends should be familiar with, this class is Netty startup class, which will carry out the relevant Pipeline configuration, we can see:

pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
Copy the code

This is how the codec is done, and the Class of the Adapter object is where the codec is done.

2. Request encoding

Above we analyzed the source code that sent the message all the way, but there is another important step, which is coding. We also found the corresponding codec class, namely NettyCodecAdapter. It is important to understand Dubbo’s packet structure before analyzing it. The Dubbo packet structure consists of a header and a body. The header contains some meta information, such as magic number, packet type, and body length. The message body contains specific call information, such as method names, parameter lists, and so on. Here is a screenshot of the message header from the official website:

After understanding the Dubbo packet structure, we enter the codec method for analysis. First go to the NettyCodecAdapter class. Instead of Posting its source code here, you can see that it also references a Codec2 interface, calling its encode and decode methods. We know that Dubbo supports more than one communication framework, although Netty is the default communication tool, so there is a codec adapter for each framework. Then the implementation class that implements the Codec2 interface is the main logic of the Codec2. We debug directly to the class where the final logic resides: ExchangeCodec.

public class ExchangeCodec extends TelnetCodec {
    // Header length
    protected static final int HEADER_LENGTH = 16;
    // Magic number content
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode(a) {
        return MAGIC;
    }

    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // Encode the request object
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // Encode the response object.
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg); }}protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel);
    // Create an array of 16 header bytes
    byte[] header = new byte[HEADER_LENGTH];
    // Set magic number
    Bytes.short2bytes(MAGIC, header);
    // Sets the packet type and serializer
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    // Set communication mode (unidirectional/bidirectional)
    if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    // Set the event identifier
    if (req.isEvent()) header[2] |= FLAG_EVENT;
    // Set the request ID
    Bytes.long2bytes(req.getId(), header, 4);
    // Get the current write position of buffer
    int savedWriteIndex = buffer.writerIndex();
    // Update witerIndex to reserve 16 bytes for the header
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    // Create the serializer
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        // Serialize event data
        encodeEventData(channel, out, req.getData());
    } else {
        // Serialize the request data
        encodeRequestData(channel, out, req.getData());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    // Get the length of the message body
    int len = bos.writtenBytes();
    checkPayload(channel, len);
    // Write the length of the message body to the header
    Bytes.int2bytes(len, header, 12);
    // Move the buffer pointer to savedWriteIndex in preparation for writing the header
    buffer.writerIndex(savedWriteIndex);
    // Write the header
    buffer.writeBytes(header); 
    // Move pointer to original write subscript + header length + body length
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
    // Other methods slightly... For example, decoding, we analyze the data in the order in which it was sent, but we don't analyze it here
}
Copy the code

This is the encoding of the request object. The whole workflow is to write the header to the header through the bit operation. The data of the request object is then serialized, and the serialized data is stored in ChannelBuffer. We then get len, the length of the data, and write len to the header. Finally, the header is written to the ChannelBuffer.

3. Request decoding

When the data is encoded and sent. The Netty server receives the message and decodes it. Again in Exchange Dec, let’s analyze the decoding method:

public class ExchangeCodec extends TelnetCodec {
    
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    // Create a header group
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    // Read the header data
    buffer.readBytes(header);
    // Call the decoding method
    return decode(channel, buffer, readable, header);
}
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // Check whether the magic is equal to the specified magic number
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break; }}// if not, call TelnetCodec's decode
        return super.decode(channel, buffer, readable, header);
    }
    // Check whether the readable data is less than the header length
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // Get the length of the message body
    int len = Bytes.bytes2int(header, 12);
    // Check if the length of the message body exceeds the limit and throw an exception if it does
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    // Check whether the number of bytes readable is smaller than the actual number of bytes
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    try {
        // Continue coding
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}
}
Copy the code

The decoding method above consists primarily of a series of checks on the requested data. Then take a look at the decodeBody method, which is also implemented in this class, but is overridden by the Subclass DubboCodec of Exchange Dec, so take a look at DubboCodec#decodeBody:

public class DubboCodec extends ExchangeCodec implements Codec2 {
  protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    // Get the third byte and the serializer number by logic and operation
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
    // Get the request number
    long id = Bytes.bytes2long(header, 4);
    // Use logic and operations to get the call type, 0 for response, 1 for request
    if ((flag & FLAG_REQUEST) == 0) {
      //...
      // Decode the Response result to get the Response object. As mentioned above, we will analyze the data in the order in which it is sent, so we will not analyze this part of the code
    } else {
        // Create a request object
        Request req = new Request(id);
        req.setVersion("2.0.0");
        // Use logic and computation to figure out the communication modereq.setTwoWay((flag & FLAG_TWOWAY) ! =0);
        // Check whether there is a bit event type
        if((flag & FLAG_EVENT) ! =0) {
            // Set the heartbeat event to request
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            if (req.isHeartbeat()) {
                // Decode the heartbeat packet
                data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
            } else if (req.isEvent()) {
                // Decode the event data
                data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
            } else {
                DecodeableRpcInvocation inv;
                // Determine whether to decode the message body on the current thread based on the URL parameter
                if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    inv.decode();
                } else {
                    // Do not decode on the current thread
                    inv = new DecodeableRpcInvocation(channel, req,
                            new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            // Set the data
            req.setData(data);
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        returnreq; }}}Copy the code

The above method decodes only part of the fields and encapsulates the decoded fields in the Request object. The decode method of the Decodeable PC Invocation is then called for subsequent decoding. This work decodes the method name, attachment, and argument of the call. Let’s look at this method:

public Object decode(Channel channel, InputStream input) throws IOException {
    // Create the serializer
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);
    // Get dubbo version, path, version by serialization and save it to attachments
    setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
    setAttachment(Constants.PATH_KEY, in.readUTF());
    setAttachment(Constants.VERSION_KEY, in.readUTF());
    // Get the method name
    setMethodName(in.readUTF());
    try{ Object[] args; Class<? >[] pts;// Get the parameter type
        String desc = in.readUTF();
        if (desc.length() == 0) {
            pts = DubboCodec.EMPTY_CLASS_ARRAY;
            args = DubboCodec.EMPTY_OBJECT_ARRAY;
        } else {
            // Parse desc into an array of types
            pts = ReflectUtils.desc2classArray(desc);
            args = new Object[pts.length];
            for (int i = 0; i < args.length; i++) {
                try {
                    args[i] = in.readObject(pts[i]);
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: "+ e.getMessage(), e); }}}}// Sets the parameter type array
        setParameterTypes(pts);
        // Get the original attachment by deserializing
        Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
        if(map ! =null && map.size() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            // Merge old attachment with present attachment
            attachment.putAll(map);
            setAttachments(attachment);
        }
        for (int i = 0; i < args.length; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }
        // Set the parameter list
        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    } finally {
        if (in instanceofCleanable) { ((Cleanable) in).cleanup(); }}return this;
}
Copy the code

The above method deserializes the method name, parameter list, and so on. At this point, the decoding of the requested data is complete, and the actual service can be invoked.

4. Invoke specific services

The Request data was decoded and encapsulated in the Request object. We go back to NettyServer and find the logical processing class that Pipeline added, NettyHandler.

If you do not understand Netty, Pipeline can be regarded as a logical processing link, a two-way link, not every processing class on the link must be executed, but the relative order cannot be changed. Incoming data is processed in the order of the logical processing classes added by the Pipeline. For example, in the figure, nettyHandler is used to send and receive messages. Before receiving messages, it must be decoded and encoded after sending messages. NettyHandler: NettyHandler: NettyHandler: NettyHandler: NettyHandler: NettyHandler: NettyHandler: NettyHandler: NettyHandler Once decoded, it goes to NettyHandler#messageReceived. The main logic is to get an instance of NettyChannel and pass it down through ChannelHandler#received.

Let’s review the Dubbo call diagram posted at the beginning. After the Server receives the request and decodes it, there is a thread dispatcher. In general, the thread on which Netty receives requests is rarely used to perform the actual service logic. Instead, it is dispatched to a thread pool for execution through a thread dispatcher. Dubbo supports five different types of thread dispatch strategies (IO threads are the threads on which the communication framework receives requests) :

Dubbo uses the all dispatch policy by default and implements the AllChannelHandler class, which implements ChannelHandler. So the ChannelHandler#received called in NettyHandler#messageReceived above goes into this implementation class for thread dispatch.

AllChannelHandler#received is simple, so I won’t post it. The method starts with a new thread pool, and the intent is obvious. Crucially, it encapsulates the request object in ChannelEventRunnable:

The ChannelEventRunnable class is also simpler and serves only as a hub. The run method calls different processing methods for different message types.

We mainly analyze received methods, but we won’t follow up on methods like joins. After passing through the ChannelEventRunnable#run method and entering the DecodeHandler class, look at the received method:

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        // If the Decodeable interface is implemented, it will be decoded
        decode(message);
    }

    if (message instanceof Request) {
        // Decode the Request's data
        decode(((Request) message).getData());
    }

    if (message instanceof Response) {
        // Decode the Request result
        decode(((Response) message).getResult());
    }
    // Perform subsequent logic
    handler.received(channel, message);
}
Copy the code

As mentioned earlier, decoding can be performed in IO threads or thread pools. This is where the thread pool decoding logic comes in. After decoding, the subsequent logic is in HeaderExchangeHandler:

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        // Process the request object
        if (message instanceof Request) {
            Request request = (Request) message;
            if (request.isEvent()) {
                // Handle events
                handlerEvent(channel, request);
            // Handle normal requests
            } else {
                // Two-way communication
                if (request.isTwoWay()) {
                    / / call handleRequest
                    Response response = handleRequest(exchangeChannel, request);
                    // Return the call result to the consumer
                    channel.send(response);
                } else {
                    // If the communication is one-way, no results need to be returnedhandler.received(exchangeChannel, request.getData()); }}// Process the response object, the consumer will execute this logic, which will be analyzed later
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } //...
    } finally{ HeaderExchangeChannel.removeChannelIfDisconnected(channel); }}Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // Check whether the request is valid
    if (req.isBroken()) {
        Object data = req.getData();
        String msg;
        if (data == null) msg = null;
        else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        // If no, set BAD_REQUEST status
        res.setStatus(Response.BAD_REQUEST);
        return res;
    }
    // Get the data field
    Object msg = req.getData();
    try {
        // Invoke subsequent logic
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}
Copy the code

The HeaderExchangeHandler#received method is logically clear, and if it is two-way communication, the subsequent logic continues and the result is returned. One-way communication does not return a result and only continues downwards. We move on to DubboProtocol#reply. Instead of Posting code, the main logic is to get the Invoker instance object and invoke the specific service through Invoker:

return invoker.invoke(inv);
Copy the code

The Invoke method is implemented in AbstractProxyInvoker and passes through a bunch of filters that allow you to type breakpoints directly into the abstract class. AbstractProxyInvoker#invoke basically calls the doInvoke method, which is an abstract method. It requires a concrete Invoker instance implementation. Invoker is created using the JavassistProxyFactory, as mentioned in Chapter 2:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, Class
       [] parameterTypes, Object[] arguments) throws Throwable {
            returnwrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}; }Copy the code

Wrapper is an abstract class for which Dubbo generates an implementation class at run time through the Javassist framework and implements the invokeMethod method. Again, let’s decompilate with Arthas. Enter the Provider process, search *.Wrapper0, and use jad to decompress:

public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        // This is the service interface we need to invoke
        ServiceAPI serviceAPI;
        try {
            // Type conversion
            serviceAPI = (ServiceAPI)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            //sendMessage is the name of the method we called, based on which to find the specified method
            if ("sendMessage".equals(string) && arrclass.length == 1) {
                return serviceAPI.sendMessage((String)arrobject[0]); }}catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.yelow.springboot.dubbo.ServiceAPI.").toString());
    }
    // Other methods slightly...
}
Copy the code

At this point, you finally see the code that calls the concrete method.

5. Return the call result

After obtaining the execution result, we need to return, the detailed call chain will not repeat, you can debug yourself. So let’s just look at the encoding of Response. In the request encoding section, we analyzed Exchange Dec, where the encoding of the response object was not analyzed, so let’s look at it now:

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
    int savedWriteIndex = buffer.writerIndex();
    try {
        Serialization serialization = getSerialization(channel);
        // Create an array of header bytes
        byte[] header = new byte[HEADER_LENGTH];
        // Set magic number
        Bytes.short2bytes(MAGIC, header);
        // Set the serializer number
        header[2] = serialization.getContentTypeId();
        if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
        // Set the response status code
        byte status = res.getStatus();
        header[3] = status;
        // Set the request number
        Bytes.long2bytes(res.getId(), header, 4);

        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        // encode response data or error message.
        if (status == Response.OK) {
            if (res.isHeartbeat()) {
                // Serialize the heartbeat response results
                encodeHeartbeatData(channel, out, res.getResult());
            } else {
                // Serialize the result of the callencodeResponseData(channel, out, res.getResult()); }}else out.writeUTF(res.getErrorMessage());
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();

        int len = bos.writtenBytes();
        checkPayload(channel, len);
        Bytes.int2bytes(len, header, 12);
        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    } catch (Throwable t) {
        // Exception handling...}}Copy the code

6. Receive the call result

Finally, we have reached the last step, which has gone through making the service call – sending the request – encoding the request – decoding the request – invoking the specific service – returning the request result (encoding the request result).

After receiving the result of the call, it also needs to be decoded. This piece does not want to repeat, specific code in DubboCodec#decodeBody, with the previous experience, we can debug their own look.

Once the response data is decoded, Dubbo dispatches the response object to the thread pool, which passes the result of the call to the user thread. As mentioned earlier, after the request is sent, DefaultFuture’s GET method is used to wait for the response result. When the response object arrives, the user thread wakes up and retrieves its own response by request number. Received: Netty; received: Netty; received: Netty; received: Netty; received: Netty; received: Netty; This received will go into DefaultFuture:

public class DefaultFuture implements ResponseFuture {
    private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    
public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if(future ! =null) {
            // Go further
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + "- >"+ channel.getRemoteAddress())); }}finally{ CHANNELS.remove(response.getId()); }}private void doReceived(Response res) {
    lock.lock();
    try {
        // Save the response object
        response = res;
        if(done ! =null) {
            // Wake up the user threaddone.signal(); }}finally {
        lock.unlock();
    }
    if(callback ! =null) { invokeCallback(callback); }}}Copy the code

The logic is to save the object to DefaultFuture and wake up the user thread. The user thread then calls the GET method to get the results.

Complete analysis to the calling process here, more usage and source code analysis we can see website document: dubbo.apache.org/zh/docs/