One, synchronous call
By default, we call a service through Dubbo and wait for the server to execute all the logic before the method returns. This is the synchronous call.
Dubbo’s underlying network communication uses Netty, and Netty is asynchronous. So how does it convert requests into synchronization?
First, let’s look at the requester. In the DubboInvoker class, it can be called in three different ways.
protected Result doInvoke(final Invocation invocation) throws Throwable {
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000); // Ignore the return valueif (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
returnnew RpcResult(); // asynchronous call}else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
returnnew RpcResult(); // synchronous call}else {
RpcContext.getContext().setFuture(null);
return(Result) currentClient.request(inv, timeout).get(); }}}Copy the code
As you can see, the above code has three branches: ignore the return value call, asynchronous call, and synchronous call. Return (Result) currentClient.request(inv, timeout).get();
The currentClient.request method is called and the request data is sent via Netty. Then call the get method of its return value to get the return value.
1. Send the request
This step basically wraps the Request method into a Request object, sends the data to the server via Netty, and then returns a DefaultFuture object.
Public ResponseFuture Request (Object Request, int timeout) throws RemotingException {// If the client is disconnectedif (closed) {
throw new RemotingException("..."); Request req = new Request(); req.setVersion("2.0.0");
req.setTwoWay(true); req.setData(request); DefaultFuture = new DefaultFuture(channel, req, timeout); Try {// Send network data through Netty channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; }return future;
}
Copy the code
The above code, the logic is very clear. In terms of seeing that its return value is a DefaultFuture object, let’s look at its constructor.
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter("timeout", 1000); // FUTURES. Put (id, this); Channels.put (id, Channel); // Channels.put (id, Channel); }Copy the code
Here, we must first understand the Future. The Future pattern is a very common design pattern in multi-threaded development, where we return the object and call its GET method to get the return value.
2. Get the return value
Let’s move on to the get method.
Public Object GET (int timeout) throws RemotingException {// Sets the default timeout periodif(timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } // Determine if the operation is not completedif(! isDone()) { long start = System.currentTimeMillis(); lock.lock(); Try {// by locking and waitingwhile(! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if(! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); }} // Returns datareturn returnFromResponse(); } // Get the return value Response private ObjectreturnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == 30 || res.getStatus() == 31) {
throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
Copy the code
In the code above, let’s focus on the GET method. Let’s summarize its operation process:
- Determine the timeout period. If the timeout period is less than 0, set the default value
- Determine whether the operation has been completed, that is, whether response is empty; If done, get the return value and return
- If the operation is not complete, lock and wait. Once notified, determine again whether the operation is complete. If done, get the return value and return.
Then two questions come to mind, where response is assigned and await is notified.
After Netty reads the network data, it calls a method in HeaderExchangeHandler, as you can see at a glance.
Public class HeaderExchangeHandler implements ChannelHandlerDelegate {// Process returned information static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
Copy the code
If the response is not empty and is not heartbeat data, call defaultFuture. received. In this method, the main idea is to find the corresponding Future based on the ID of the returned message and notify it.
public static void received(Channel channel, Future = FUTURES. Remove (response.getid ()); response.getid ();if(future ! = null) {// Notify method future.doreceived (response); }else {
logger.warn("..."); }} finally {Future Channels.remove (response.getid ()); }}Copy the code
future.doReceived(response); It’s pretty simple. It just answers those two little questions. Assign response and await notifications.
private void doReceived(Response res) { lock.lock(); Try {// assign response response = res;if (done! = null) {// Call method done.signal(); } } finally { lock.unlock(); }if (callback != null) {
invokeCallback(callback);
}
}
Copy the code
This way, Dubbo completes the synchronous call. Let’s summarize the overall process:
- Encapsulate the Request as a Request object and build a DefaultFuture object with the Request ID corresponding to the Future.
- Send a Request object via Netty and return a DefaultFuture object.
- call
DefaultFuture.get()
Wait for data transfer to complete. - When the server finishes processing, the Netty processor receives the returned data, notifying the DefaultFuture object.
- The get method returns and gets the return value.
Two, asynchronous invocation
If we want to use asynchronous invocation, we need to configure it. In the consumer profile
<dubbo:reference id="infoUserService"
interface="com.viewscenes.netsupervisor.service.InfoUserService"
async="true"/>
Copy the code
And then we’ll look at how it’s implemented
if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
}
Copy the code
As you can see, it is also a Future object returned via CurrentClient. request, but does not call its GET method; Instead, wrap the Future object as a FutureAdapter and set it to rpcContext.getContext ().
RpcContext is a context message in Dubbo, which is a temporary state logger for ThreadLocal. Let’s focus on its setFuture method.
public class RpcContext { private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() { @Override protected RpcContextinitialValue() {
return new RpcContext();
}
};
private Future<?> future;
public void setFuture(Future<?> future) {
this.future = future;
}
}
Copy the code
Since it is implemented based on ThreadLocal, we should get the context information object from ThreadLocal and its Future object when we get the return value. At this point, our client should do this
userService.sayHello("Jack");
Future<Object> future = RpcContext.getContext().getFuture();
System.out.println("Service returns message :"+future.get());
Copy the code
The advantage of this is that instead of waiting on a single method, you can call multiple methods and they will execute in parallel. Like the example on the website:
// This call immediately returns null fooService.findfoo (fooId); Future<Foo> fooFuture = rpcContext.getContext ().getFuture(); Future<Foo> fooFuture = rpcContext.getContext (). // This call immediately returns null barservice.findbar (barId); Future<Bar> barFuture = rpcContext.getContext ().getFuture(); // The client does not need to start multithreading to support parallelism, but NIO's non-blocking completion. // If foo has returned, just get the return valuewaitFoo foo = foofuture.get (); Bar bar = barfuture.get (); // If foo takes 5 seconds to return and bar takes 6 seconds to return, you only need to wait 6 seconds to retrieve foo and bar for the next processing.Copy the code