Hello, everyone, I am xiao Hei, meet you again ~~
In the configuration center, there is a classic PUB/SUB scenario: when a configuration item changes, it needs to be synchronized to each server node in real time and pushed to the client cluster at the same time.
In the previous implementation of the simple configuration center, this was done through Redis pub/ Sub. This implementation, while simple, relies heavily on Redis.
As a basic component, the configuration center is more user-friendly if it minimizes external dependencies. So, is it possible to implement a PUB/Sub scenario without using MQ? The answer is yes.
Pub/SUB scheme based on DB
Instead of implementing this scenario based on MQ, Apollo implemented a simple message queue through a database. The schematic diagram is as follows:
The general implementation is as follows:
- The Admin Service inserts a message record into the ReleaseMessage table after the configuration is published
- The Config Service has a thread that scans the ReleaseMessage table every second to see if there are any new messages.
- The Config Service notifies the client of new message records. Each Config Service is notified.
Now, let’s learn the source code with these questions. (Voiceover: Thinking is more important than source)
DatabaseMessageSender
The Admin Service calls the DatabaseMessageSender#sendMessage method after the configuration is published, which does two main things:
- Create ReleaseMessage, and then save it to the database
- Record the currently saved ReleaseMessage Id and place it in
DatabaseMessageSender#toClean
In the queue.
Why record the currently saved ReleaseMessage Id?
There is a scheduled task in databasemessagender that clears releasemessages with ids smaller than the current one.
ReleaseMessageScanner
The ReleaseMessageScanner component in the Config Service scans the ReleaseMessage table every second (by default) to obtain the latest messages.
With db-based pub/sub, each Config Service senses the message through the DB after the configuration is published, and then notifies the client.
How does the Config Service notify the client?
Real-time messaging based on long polling
In Apollo’s design, instead of the server actively pushing the client after a configuration update, the client uses a long poll to ask the server if any configuration changes have taken place. If no configuration that the client cares about is published within 60 seconds, the Http status code 304 is returned to the client. If a configuration that the client is concerned about is published, the request is returned immediately. After obtaining the namespace with the changed configuration, the client requests the Config Service to obtain the latest configuration of the namespace.
The client code in RemoteConfigLongPollService# doLongPollingRefresh, the code is simpler, interested students can refer to.
Let’s focus here on how the server is implemented.
In the traditional servlet model, each request is handled by a thread, and if a request takes a long time to process, the thread pool-based synchronization model will quickly run out of threads, causing the server to be unable to respond to new requests.
Asynchronous support was introduced in Servlet 3.0 to allow asynchronous processing of a request, during which the worker thread is not blocked and can continue processing incoming client requests.
Starting with Spring 3.2, you can use DeferredResult for asynchronous processing. With DeferredResult, you can set a timeout and automatically return a timeout error response. Meanwhile, in another thread, you can call its setResult() to write the result back.
The Apollo client long polling address is /notifications/v2 and the corresponding server code is NotificationControllerV2.
Spring’s DeferredResult is used to implement this in NotificationControllerV2. This article focuses on the idea of solving the problem, I will not show the source code, interested students can read the source code. However, Hei wrote a simple demo to help us understand the use of DeferredResult.
@Slf4j
@RestController
public class DeferredResultDemoController {
private final Multimap<String, DeferredResult<String>> deferredResults = ArrayListMultimap.create();
@GetMapping("/info")
public DeferredResult<String> info(String key) {
// Set the timeout time to 1 second. Set the timeout to return the result
DeferredResult<String> result = new DeferredResult<>(1000L."key not change");
// Put result in deferredResults, where key is the configuration item of concern for the current request
deferredResults.put(key, result);
// If the DeferredResult times out, remove the current DeferredResult, print the log, and return the result passed in the DeferredResult constructor
result.onTimeout(() -> {
deferredResults.remove(key, result);
log.info("time out key not change");
});
// If that's done, remove the current DeferredResult from deferredResults
result.onCompletion(() -> deferredResults.remove(key, result));
return result;
}
@PostConstruct
public void init(a) {
new Thread(() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(700);
} catch (InterruptedException e) {
log.info(e.getMessage(), e);
}
// Schedule tasks to simulate configuration updates
// When the Hello Key changes, we get the relevant deferredResults from deferredResults, and set the setResult method to return the result and remove deferredResults
if (deferredResults.containsKey("hello")) {
Collection<DeferredResult<String>> results = deferredResults.removeAll("hello");
results.forEach(stringDeferredResult -> stringDeferredResult.setResult("hello key change :"+ System.currentTimeMillis())); } } }).start(); }}Copy the code