The idea for this article came from a business requirement that required access to a third-party external service. Since this service only provides asynchronous API, in order not to affect the synchronous processing mode of the existing system, when accessing the external service, the application hides the difference externally and realizes asynchronous request synchronization internally.
Abstract:
- Problems with asynchrony for existing architectures
- Dubbo asynchronous to synchronous solution
- Design scheme of asynchronous to synchronous architecture
0 x00. Preface
There is a system with the overall architecture as follows:
This is a common synchronous design scenario where the upstream system waits for the downstream system interface to return the call result.
Now you need to access another third-party service B, which is different from Service A in that it is an asynchronous API. After the call, only success is returned, and the result of the processing is later returned via an asynchronous notification.
After access, the overall architecture is as follows:
Because of network isolation policies, notification receivers and communication services need to be deployed separately. If this is not required, you can combine communication service B and notification receiver into one application.
In addition, all applications in the figure are deployed on two nodes.
In order not to affect the synchronous processing logic of the upstream system of OpenAPI, communication service B cannot return the third-party service immediately after invoking it, and needs to wait for the notification of the result to get the specific return result. This requires the asynchrony to be synchronized within communication service B.
This is a typical asynchronous to synchronous problem, and the whole process involves two problems.
- How does the communication service B business thread enter the wait state? How do I wake up the correct waiting thread?
- Because of the communication service B two-node deployment, how does the notification receiver forward the results to the node waiting for processing?
The solution to problem 1 refers to the Dubbo design idea.
When we invoke a remote service using Dubbo, by default this is a blocking invocation, in which the Consumer code blocks and waits until the Provider returns.
Because Dubbo sends network requests based on Netty, this is an asynchronous process. In order for business threads to wait synchronously, the process needs to turn asynchrony into synchronization.
0x01. Dubbo Asynchronous to synchronous solution
1.1 Service thread synchronization is blocked
Dubbo initiates a remote call code located at DubboInvoker#doInvoke:
The Dubbo version is 2.6.X. X reconstructs DefaultFuture, but the essence remains the same.
By default, Dubbo supports synchronous invocation, where DefaultFuture objects will be created.
The very important logic here is to generate a unique ID for each request, and then Map that ID to DefaultFuture into a Map.
This request ID is important because the consumer calls the service concurrently to send the request, and multiple business threads will block at the same time. When the response is received, we need to wake up the correct waiting thread and return the processing results.
With the unique mapping of ID, it is natural to find its corresponding DefaultFuture and wake up its corresponding business thread.
The business thread blocks by calling DefaultFuture#get. This code is relatively simple and blocks the line layer by calling Condition#await.
1.2 Wake up the service thread
When the consumer receives the return result from the service provider, the DefaultFuture# Received method is called.
Find the corresponding DefaultFuture object by the unique ID in the response object, set the result in the DefaultFuture object, and then wake up the corresponding business thread.
There is actually an optimization point to use done#signalAll instead of done#signal. Keep this in mind when using the condition wait notification mechanism.
For more information :github.com/apache/dubb…
1.3 Design points
Normally, when the consumer receives the response, the DefaultFuture will be removed from the FUTURES Map.
However, in exceptional cases, if the service provider is slow in processing and cannot return the response result in time, the consumer business thread will wake up due to timeout. FUTURES backlogs invalid DefaultFuture objects in this case. If it is not cleaned up in time, OOM will occur in extreme cases.
An asynchronous thread will be started inside DefaultFuture to poll FUTURES periodically to determine the timeout time of DefaultFuture and to clean up invalid defaultFutures in time.
0x02. Design of forwarding scheme
According to the Dubbo solution, the solution to problem 1 is relatively simple. The specific process is as follows:
- Communication service B internally generates a unique request ID and sends it to the third-party service
- If the request is successful, the internal version is used
Map
Stores the correspondence and causes the business thread to block waiting - Communication service B receives the asynchronous notification result, searches for the corresponding service thread by ID, and wakes up the corresponding thread
This design process needs to pay attention to setting a reasonable timeout time, which needs to take into account the time spent on remote service invocation. The following formula can be used:
Service thread waiting time = Timeout of communication service B interface - Time consumed by invoking third-party service B interfaceCopy the code
I won’t post the code here, but refer to the Dubbo DefaultFuture for details.
Let’s focus on how the notification service forwards the results to the nodes of the correct communication service B. Two schemes come to mind:
- SocketServer scheme
- MQ solutions
2.1 SocketServer
Communication service B uses SocketServer to build a service receiver. When the notification receiver receives the notification from third-party service B, it forwards the result to communication service B through Socket.
The overall system architecture is as follows:
Because the production service is deployed on two nodes, the notification receiver cannot write out the forwarding address. Here we store the relationship between the request ID and the communication service B socket service address in Redis, and then inform the receiver to find the correct address by the ID.
The scheme is a bit complicated to be honest.
First, SocketServer coding is difficult, so it is difficult to write an efficient SocketServer, which may cause various bugs if you are not careful.
Second communication service B Service ADDRESS Configuration In the configuration file, the addresses of the two nodes are different, resulting in different configurations for the same application. This is not very friendly for later maintenance.
Third, the additional introduction of Redis dependency increases the system complexity.
2.2 the MQ solutions
Compared with SocketServer solution, MQ solution is relatively simple. Here, MQ broadcast consumption is adopted. The architecture is shown in the figure:
After receiving the asynchronous notification, the notification receiver sends the result directly to MQ.
Communication service B enables broadcast consumption mode and pulls MQ messages.
Communication service B_1 pulls the message, through the request ID mapping relationship, does not find the internal waiting thread, knows that this is not its own waiting message, so B_1 can directly discard.
The communication service B_2 pulls the message, finds the waiting thread successfully through the request ID mapping relationship, and then wakes up the waiting thread and returns the final result.
Compared with SocketServer, THE OVERALL process of MQ solution is simple, and the programming difficulty is low. There is no special configuration.
However, this scheme relies heavily on the real-time performance of MQ messages. If the MQ message delivery delay is very high, it will cause the communication service B service thread to wake up due to timeout and return abnormal services.
Here we choose to use RocketMQ, long poll Pull method, to ensure that the message is very real-time,
In summary, the scheme of MQ is adopted here.
0 x03. Summary
Asynchronous to synchronous we need to solve the problem of synchronization blocking and how to wake up.
Blocking/waking can be Condition#await/signalAll, respectively. However, in this process we need to generate a unique request ID and save the mapping between this ID and the business thread. Then we can wake up the correct waiting thread with the unique ID until the result is returned.
As long as the above points are understood, the problem of asynchronous to synchronous can be easily solved.
In addition, if you are also experiencing asynchronous to synchronous problems, this solution is expected to help you. If you have other design ideas, please leave a comment and discuss with us
The resources
- Dubbo.apache.org/zh-cn/docs/…
- Dubbo.apache.org/zh-cn/blog/…
The best way to say (for attention)
In fact, this article has been written for a long time. I had thought of writing this article for a long time before, but I didn’t think well about how to write it.
See here, click “follow”, click “like”. Not next time, big brother. Writing an article is hard. You need some positive feedback.
If you find something wrong, please leave a message and point it out to me so that I can modify it.
Thank you for reading, I insist on original, very welcome and thank you for your attention
Welcome to pay attention to my public account: procedures to get daily dry goods push. If you are interested in my topics, you can also follow my blog: studyidea.cn