0 x00 the
SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.
This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.
This is the thirteenth article that introduces service rollout from the SessionServer perspective.
This article mainly introduces the business, while sorting out the logic, design and patterns. Because there are so many modules involved in the registration process, this article only focuses on the Session Server part of the registration process.
0x01 Service Domain
1.1 Application Scenarios
The offline process of a service is when the service performs regular register and offline operations through code calls, regardless of offline scenarios due to unexpected circumstances such as service downtime.
1.1.1 Service Publishing
In a typical “service addressing for RPC calls” application scenario, the service provider publishes the service through the following two steps:
- Sign up to SOFARegistry as Publisher;
- Publish to SOFARegistry the data that needs to be published (typically IP addresses, ports, invocation methods, and so on);
Correspondingly, the caller of the service implements the service invocation through the following steps:
- Sign up and register yourself as Subscriber to SOFARegistry;
- Subscribe to receive service data from SOFARegistry.
1.1.2 Necessity of SessionServer
In SOFARegistry, all clients register and subscribe to data, perform a consistent Hash based on the dataInfoId, calculate which DataServer to access, and then establish a long connection with that DataServer.
Since each Client typically registers and subscribes to a large amount of dataInfoId data, we can expect each Client to connect to several dataserVers. The problems with this architecture are: The number of connections to each DataServer increases as the number of clients increases. In extreme cases, each Client needs to be connected to each DataServer. Therefore, expanding a DataServer does not linearly allocate the number of connections to each Client.
Therefore, it is important to design a connection broker layer for the DataServer layer, so SOFARegistry has the SessionServer layer. As the number of clients increases, you can expand the number of session Servers to solve the connection bottleneck of a single server.
1.2 points
Since SessionServer is an intermediate layer, it seems to be relatively simple. On the surface, it is accept and forward.
But in fact, in a large system, how to achieve module segmentation and decoupling logically and physically is very necessary.
1.3 Ali Plan
Let’s focus on the registration part of the Ali scheme.
1.3.1 Registration process
The offline process of the service refers to the normal registration (publisher.unregister) and offline (publisher.unregister) of the service through code invocation, regardless of the offline caused by unexpected circumstances such as service downtime. The figure above roughly shows the internal flow of service data through “a service registration process”.
- Client invokes publisher.register to register the service with SessionServer.
- SessionServer receives the PublisherRegister data and writes it to the memory. SessionServer stores the Client data into the memory, which can be used to check the DataServer periodically. Then search for the corresponding DataServer based on the consistent Hash of the dataInfoId and send the PublisherRegister to the DataServer.
- DataServer receives PublisherRegister data and first writes it to memory. DataServer aggregates all publisherRegisters in the dataInfoId dimension. At the same time, DataServer notifies all session Servers of the dataInfoId and version.
- At the same time, asynchronously, DataServer incrementally synchronizes data to other replicas in the dataInfoId dimension. DataServer stores multiple copies (three by default) of each shard based on consistent Hash shards.
- After receiving the change event notification, the SessionServer compares the version of dataInfoId stored in the SessionServer memory. If the version is smaller than that sent by DataServer, It actively fetkes the complete data of the dataInfoId from DataServer, that is, it contains all the PublisherRegister lists specific to that dataInfoId.
- Finally, the SessionServer pushes the data to the corresponding Client, and the Client receives the latest service list data after the service registration.
Due to space constraints, this article discusses the first two points.
1.3.2 graphic
The following figure shows the code flow for Publisher registration
This process is also handled by handler-task & strategy-listener. The processing process of tasks in the code is basically the same as the subscription process.
0x02 Client SDK
PublisherRegistration is the interface of Client. The key code for publishing data is as follows:
Construct the publisher registry
PublisherRegistration registration = new PublisherRegistration("Com. Alipay. The test. The demo. Service: 1.0 @ DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");
// Register the registry with the client and publish the data
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200? xx=yy");
// You can republish data using the publisher model if you want to overwrite the last published data
publisher.republish("10.10.1.1:12200? xx=zz");
Copy the code
The key to publishing data is to construct a PublisherRegistration, which contains three properties:
The property name | Attribute types | describe |
---|---|---|
dataId | String | DataId. The value must be the same when publishing subscriptions. The unique dataId consists of dataId + group + instanceId. |
group | String | The same value must be used when publishing subscriptions. The unique identifier of data consists of dataId + group + instanceId. The default value is DEFAULT_GROUP. |
appName | String | Application appName. |
0x03 Session server
The process comes to the Session Server.
3.1 the Bean
First, you can start with Beans.
@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers(a) {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(publisherHandler());
list.add(subscriberHandler());
list.add(watcherHandler());
list.add(clientNodeConnectionHandler());
list.add(cancelAddressRequestHandler());
list.add(syncConfigHandler());
return list;
}
Copy the code
ServerHandlers are a combination of response functions for Bolt Server.
@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry(a) {
return new SessionRegistry();
}
Copy the code
From the Bean perspective, the current logic is shown in the figure, with a decoupling Strategy:
Beans
+-----------------------------------+
| Bolt Server(in openSessionServer)| +---------------------------------+ | | +-> | DefaultPublisherHandlerStrategy | | +----------------------+ | | +---------+-----------------------+ | | serverHandlers | | | | | | | | | | | | +------------------+ | | | | | | | PublisherHandle+----------------+ v | | | | | | +-------+-------+ | | | watcherHandler | | | |SessionRegistry| | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | |... | | | | | +------------------+ | | | +----------------------+ | +-----------------------------------+Copy the code
Generally, service publishers and Session Servers should be in a Data Center, which is the singleton concept practiced by Ali et al.
3.2 the entrance
PublisherHandler is the Session Server interface to the Client and the Bolt Server response function.
public class PublisherHandler extends AbstractServerHandler {
@Autowired
private ExecutorManager executorManager;
@Autowired
private PublisherHandlerStrategy publisherHandlerStrategy;
@Override
public Object reply(Channel channel, Object message) throws RemotingException {
RegisterResponse result = new RegisterResponse();
PublisherRegister publisherRegister = (PublisherRegister) message;
publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
return result;
}
Copy the code
The logic is shown below:
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer)| | | | | | +----------------------+ | + | | serverHandlers | | | | | | +--------+ PublisherRegister | | +------------------+ | | | Client +---------------------------> PublisherHandler | | | +--------+ 1 | | | | | | + | | | . | | | | | | +------------------+ | | | | +----------------------+ | | +-----------------------------------+ |Copy the code
3.3 strategy
On the whole, handler-task & strategy-listener are used to process.
What is the Strategy Pattern?
In the process of software development, there are often many algorithms or strategies to achieve a certain function. We can choose different algorithms or strategies to complete the function according to the different environment or conditions. If these algorithms and strategies of abstracting, provides a unified interface, different algorithms have different implementation classes or strategy, so that in the client program can by injecting different implementation object or strategy to implement the algorithm of dynamic replacement, scalability and maintainability of the patterns are higher, this is the strategy pattern.
Definition of Strategy Pattern
-
Strategy pattern: A family of algorithms is defined, packaged separately so that they can be replaced with each other. This pattern makes changes to the algorithm independent of the customers using the algorithm.
-
Simple to understand: defines a series of algorithms. Each algorithm is encapsulated. The algorithms can be replaced with each other. The change of the algorithm does not affect the customers who use the algorithm. It belongs to the behavioral pattern.
In the Strategy Pattern, the behavior of a class or its algorithm can be changed at run time. This type of design pattern is behavioral.
In the policy pattern, we create objects that represent various policies and a context object whose behavior changes as the policy object changes. The policy object changes the execution algorithm of the context object.
3.3.1 Directory Structure
From the perspective of the directory structure, there are many definitions and implementations of Strategy, which should be different strategies according to different situations. Some of them are currently reserved interfaces.
Com/alipay/sofa/registry/server/session/strategy ├ ─ ─ DataChangeRequestHandlerStrategy. Java ├ ─ ─ PublisherHandlerStrategy. Java ├ ─ ─ ReceivedConfigDataPushTaskStrategy. Java ├ ─ ─ ReceivedDataMultiPushTaskStrategy. Java ├ ─ ─ SessionRegistryStrategy. Java ├ ─ ─ SubscriberHandlerStrategy. Java ├ ─ ─ SubscriberMultiFetchTaskStrategy. Java ├ ─ ─ SubscriberRegisterFetchTaskStrategy. Java ├ ─ ─ SyncConfigHandlerStrategy. Java ├ ─ ─ TaskMergeProcessorStrategy. Java ├ ─ ─ WatcherHandlerStrategy. Java └ ─ ─ impl ├ ─ ─ DefaultDataChangeRequestHandlerStrategy. Java ├ ─ ─ DefaultPublisherHandlerStrategy. Java ├ ─ ─ DefaultPushTaskMergeProcessor. Java ├ ─ ─ DefaultReceivedConfigDataPushTaskStrategy. Java ├ ─ ─ DefaultReceivedDataMultiPushTaskStrategy. Java ├ ─ ─ DefaultSessionRegistryStrategy. Java ├ ─ ─ DefaultSubscriberHandlerStrategy. Java ├ ─ ─ DefaultSubscriberMultiFetchTaskStrategy. Java ├ ─ ─ DefaultSubscriberRegisterFetchTaskStrategy. Java ├ ─ ─ DefaultSyncConfigHandlerStrategy. Java └ ─ ─ DefaultWatcherHandlerStrategy. JavaCopy the code
3.3.2 rainfall distribution on 10-12 DefaultPublisherHandlerStrategy
As far as the code is concerned, it’s just setup, sort, forward. That is, set the default information for Publisher and register or unRegister depending on the event type.
public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
@Autowired
private Registry sessionRegistry;
@Override
public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
try {
String ip = channel.getRemoteAddress().getAddress().getHostAddress();
int port = channel.getRemoteAddress().getPort();
publisherRegister.setIp(ip);
publisherRegister.setPort(port);
if (StringUtils.isBlank(publisherRegister.getZone())) {
publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
}
if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
}
Publisher publisher = PublisherConverter.convert(publisherRegister);
publisher.setProcessId(ip + ":" + port);
publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
sessionRegistry.register(publisher);
} else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
sessionRegistry.unRegister(publisher);
}
registerResponse.setSuccess(true);
registerResponse.setVersion(publisher.getVersion());
registerResponse.setRegistId(publisherRegister.getRegistId());
registerResponse.setMessage("Publisher register success!"); }}}Copy the code
The following figure shows the logic
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | | |
+ | | | watcherHandler | | | +-------------------------------+
| | | | | | |
| | | | ...... | | |
| | | +------------------+ | |
| | +----------------------+ |
+-----------------------------------+
Copy the code
Mobile phone as shown in figure
3.4 Core logical Components
In the previous code, the policy calls sessionRegistry.register(Publisher), the registration function.
As you can see from the internal member variables of SessionRegistry, this is where the core logic of the Session Server lies, as opposed to the business engine.
It mainly provides the following functions:
-
Register (StoreData Data) : Registers new Publisher or subscriber data
-
Cancel (List connectIds) : Cancel publisher or subscriber data
-
Remove (List connectIds) : Removes publisher or subscriber data
-
UnRegister (StoreData data) : unRegister publisher or subscriber data
-
.
The specific member variables are as follows:
public class SessionRegistry implements Registry {
/** * store subscribers */
@Autowired
private Interests sessionInterests;
/** * store watchers */
@Autowired
private Watchers sessionWatchers;
/** * store publishers */
@Autowired
private DataStore sessionDataStore;
/** * transfer data to DataNode */
@Autowired
private DataNodeService dataNodeService;
/** * trigger task com.alipay.sofa.registry.server.meta.listener process */
@Autowired
private TaskListenerManager taskListenerManager;
/** * calculate data node url */
@Autowired
private NodeManager dataNodeManager;
@Autowired
private SessionServerConfig sessionServerConfig;
@Autowired
private Exchange boltExchange;
@Autowired
private SessionRegistryStrategy sessionRegistryStrategy;
@Autowired
private WrapperInterceptorManager wrapperInterceptorManager;
@Autowired
private DataIdMatchStrategy dataIdMatchStrategy;
@Autowired
private RenewService renewService;
@Autowired
private WriteDataAcceptor writeDataAcceptor;
private volatile boolean enableDataRenewSnapshot = true;
}
Copy the code
The register function generates a WriteDataRequest and calls writeDataAcceptor.accept to complete the process.
@Override
public void register(StoreData storeData) {
WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
new Wrapper<StoreData, Boolean>() {
@Override
public Boolean call(a) {
switch (storeData.getDataType()) {
case PUBLISHER:
Publisher publisher = (Publisher) storeData;
sessionDataStore.add(publisher);
// All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
// are handed over to WriteDataAcceptor
writeDataAcceptor.accept(new WriteDataRequest() {
@Override
public Object getRequestBody(a) {
return publisher;
}
@Override
public WriteDataRequestType getRequestType(a) {
return WriteDataRequestType.PUBLISHER;
}
@Override
public String getConnectId(a) {
return publisher.getSourceAddress().getAddressString();
}
@Override
public String getDataServerIP(a) {
Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
returndataNode.getNodeUrl().getIpAddress(); }}); sessionRegistryStrategy.afterPublisherRegister(publisher);break;
case SUBSCRIBER:
Subscriber subscriber = (Subscriber) storeData;
sessionInterests.add(subscriber);
sessionRegistryStrategy.afterSubscriberRegister(subscriber);
break;
case WATCHER:
Watcher watcher = (Watcher) storeData;
sessionWatchers.add(watcher);
sessionRegistryStrategy.afterWatcherRegister(watcher);
break;
default:
break;
}
return null;
}
@Override
public Supplier<StoreData> getParameterSupplier(a) {
return () -> storeData;
}
}, wrapperInterceptorManager);
try {
wrapperInvocation.proceed();
} catch (Exception e) {
throw new RuntimeException("Proceed register error!", e); }}Copy the code
The current logic is as follows:
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | 2 | |
+ | | | watcherHandler | | | +------------+------------------+
| | | | | | | |
| | | | ...... | | | |
| | | +------------------+ | | 3 | register
| | +----------------------+ | |
+-----------------------------------+ |
v
+-------------------+-------------------+
| SessionRegistry |
| |
| |
| storeData.getDataType() == PUBLISHER |
+---------------------------------------+
Copy the code
The mobile phone is as follows:
3.4.1 track SessionRegistryStrategy
Here is another strategy, there is only one implementation at present, it should be to make a replacement in the future, the current function is simply leaving the interface empty.
We can see that Ali is trying to decouple everything.
public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
@Override
public void afterPublisherRegister(Publisher publisher) {}}Copy the code
3.4.2 Storage Module
In the previous registration process:
sessionDataStore.add(publisher);
Copy the code
This is the data storage module of Session, which is also the core of the system. Stores all Publishers registered in this Session.
public class SessionDataStore implements DataStore {
/** * publisher store */
private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry = new ConcurrentHashMap<>();
/*** index */
private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>> connectIndex = new ConcurrentHashMap<>();
}
Copy the code
Two storage methods are recorded here, which are stored according to dataInfoId and connectId respectively.
When stored, the version number and timestamp are compared.
@Override
public void add(Publisher publisher) {
Publisher.internPublisher(publisher);
write.lock();
try {
Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());
if (publishers == null) {
ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
if (publishers == null) {
publishers = newmap;
}
}
Publisher existingPublisher = publishers.get(publisher.getRegisterId());
if(existingPublisher ! =null) {
if(existingPublisher.getVersion() ! =null) {
long oldVersion = existingPublisher.getVersion();
Long newVersion = publisher.getVersion();
if (newVersion == null) {
return;
} else if (oldVersion > newVersion) {
return;
} else if (oldVersion == newVersion) {
Long newTime = publisher.getRegisterTimestamp();
long oldTime = existingPublisher.getRegisterTimestamp();
if (newTime == null) {
return;
}
if (oldTime > newTime) {
return;
}
}
}
}
publishers.put(publisher.getRegisterId(), publisher);
addToConnectIndex(publisher);
} finally{ write.unlock(); }}Copy the code
3.5 Acceptor module
After the SessionServer itself is stored, the next step is to notify the Data Server.
The goal here is to forward the received registrations to DataServer, but there is a bit of a complexity to the coupling.
3.5.1 track of overall Acceptor
WriteDataAcceptorImpl handles writes to specific Publisher. First you need to unify the write requests.
Use private Map
writeDataProcessors = new ConcurrentHashMap(); To store all write requests uniformly.
Write requests for different connections are processed based on different connections.
Details are as follows:
public class WriteDataAcceptorImpl implements WriteDataAcceptor {
@Autowired
private TaskListenerManager taskListenerManager;
@Autowired
private SessionServerConfig sessionServerConfig;
@Autowired
private RenewService renewService;
/** * acceptor for all write data request * key:connectId * value:writeRequest processor * */
private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();
public void accept(WriteDataRequest request) {
String connectId = request.getConnectId();
WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));
writeDataProcessor.process(request);
}
public void remove(String connectId) { writeDataProcessors.remove(connectId); }}Copy the code
The current logic is shown in the following figure
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | 2 | |
+ | | | watcherHandler | | | +------------+------------------+
| | | | | | | |
| | | | ...... | | | |
| | | +------------------+ | | register | 3
| | +----------------------+ | |
| +-----------------------------------+ |
| v
| +-----------------------------------------------------+ +-------------+-------------------------+
| | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry |
| | | <------------------+ |
| | | | |
| | Map<String, WriteDataProcessor> writeDataProcessors | | storeData.getDataType() == PUBLISHER |
| | | +---------------------------------------+
+ +-----------------------------------------------------+
Copy the code
Mobile phone as shown in figure
3.5.2 Specific handling
Having consolidated all the requests earlier, we now need to continue processing for each write to the connection.
The key here is the following data structure, that is, each connection’s write request is placed in the queue. The purpose of using Queue is to cache data internally and uniformly to DataServer.
ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue
Copy the code
Each request is processed differently.
For our example, the processing is as follows:
case PUBLISHER: {
doPublishAsync(request);
}
Copy the code
Finally, a request taskType.publish_datA_task is sent to the taskListenerManager, which is handled by the publishDataTask call to the publishDataTask Listener.
There is a listener decoupling, which we will explain next.
private void doPublishAsync(WriteDataRequest request) {
sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}
private void sendEvent(Object eventObj, TaskType taskType) {
TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
taskListenerManager.sendTaskEvent(taskEvent);
}
Copy the code
The specific code is as follows:
public class WriteDataProcessor {
private final TaskListenerManager taskListenerManager;
private final SessionServerConfig sessionServerConfig;
private final RenewService renewService;
private final String connectId;
private Map<String, AtomicLong> lastUpdateTimestampMap = new ConcurrentHashMap<>();
private AtomicBoolean writeDataLock = new AtomicBoolean(
false);
private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue = new ConcurrentLinkedQueue();
private AtomicInteger acceptorQueueSize = new AtomicInteger(0);
public void process(WriteDataRequest request) {
// record the last update time by pub/unpub
if (isWriteRequest(request)) {
refreshUpdateTime(request.getDataServerIP());
}
if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
// snapshot has high priority, so handle directly
doHandle(request);
} else {
// If locked, insert the queue;
// otherwise, try emptying the queue (to avoid residue) before processing the request.
if (writeDataLock.get()) {
addQueue(request);
} else{ flushQueue(); doHandle(request); }}}private void doHandle(WriteDataRequest request) {
switch (request.getRequestType()) {
case PUBLISHER: {
doPublishAsync(request);
}
break;
case UN_PUBLISHER: {
doUnPublishAsync(request);
}
break;
case CLIENT_OFF: {
doClientOffAsync(request);
}
break;
case RENEW_DATUM: {
if (renewAndSnapshotInSilence(request.getDataServerIP())) {
return;
}
doRenewAsync(request);
}
break;
case DATUM_SNAPSHOT: {
if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
return;
}
halt();
try {
doSnapshotAsync(request);
} finally{ resume(); }}break;
}
private void doPublishAsync(WriteDataRequest request) {
sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}
private void sendEvent(Object eventObj, TaskType taskType) {
TaskEvent taskEvent = newTaskEvent(eventObj, taskType); taskListenerManager.sendTaskEvent(taskEvent); }}Copy the code
As shown in the figure below
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | 2 | |
+ | | | watcherHandler | | | +------------+------------------+
| | | | | | | |
| | | | ...... | | | |
| | | +------------------+ | | register | 3
| | +----------------------+ | |
| +-----------------------------------+ |
| v
| +---------------------------------------------------------+ +---------+-----------------------------+
| | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry |
| | | <------------------+ |
| | | 4 | sessionDataStore.add(publisher) |
| | Map<connectId , WriteDataProcessor> writeDataProcessors | | |
| | | | storeData.getDataType() == PUBLISHER |
| +----------------------+----------------------------------+ | |
| process | 5+---------------------------------------+ | v | +-------------------+---------------------+ +--------------------------+ | | WriteDataProcessor | | PublishDataTaskListener | | | | PUBLISH_DATA_TASK | | | | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> | PublishDataTask | | | |6 +--------------------------+
+ +-----------------------------------------+
Copy the code
Mobile phone as shown below:
3.6 the Listener decoupling
All of this was logically integrated, and here we have a decoupling.
3.6.1 Decouple the engine
DefaultTaskListenerManager decoupling mechanism is, as you can see, add the listener, when the user calls sendTaskEvent will iterate over all listeners, call the corresponding to the listener.
public class DefaultTaskListenerManager implements TaskListenerManager {
private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();
@Override
public Multimap<TaskType, TaskListener> getTaskListeners(a) {
return taskListeners;
}
@Override
public void addTaskListener(TaskListener taskListener) {
taskListeners.put(taskListener.support(), taskListener);
}
@Override
public void sendTaskEvent(TaskEvent taskEvent) {
Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
for(TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); }}}Copy the code
3.6.2 Listener
PublishDataTaskListener is the corresponding handler, and in its support function, it declares support for PUBLISH_DATA_TASK. This completes decoupling.
public class PublishDataTaskListener implements TaskListener {
@Autowired
private DataNodeService dataNodeService;
@Autowired
private TaskProcessor dataNodeSingleTaskProcessor;
@Autowired
private ExecutorManager executorManager;
@Override
public TaskType support(a) {
return TaskType.PUBLISH_DATA_TASK;
}
@Override
public void handleEvent(TaskEvent event) {
SessionTask publishDataTask = newPublishDataTask(dataNodeService); publishDataTask.setTaskEvent(event); executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask)); }}Copy the code
3.7 the Task scheduling
The Listener is found above, which starts the task executing the business with the following code. But the mechanism behind this needs to be explored.
executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
Copy the code
3.7.1 ExecutorManager
With ExecutorManager, thread pools are uniformly started and shut down. PublishDataExecutor is one of them.
ExecutorManager code excerpts are as follows:
public class ExecutorManager {
private final ScheduledThreadPoolExecutor scheduler;
private final ThreadPoolExecutor publishDataExecutor;
private static final String PUBLISH_DATA_EXECUTOR = "PublishDataExecutor";
public ExecutorManager(SessionServerConfig sessionServerConfig) {
publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
sessionServerConfig.getPublishDataExecutorMinPoolSize(),
sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
new NamedThreadFactory("PublishData-executor".true)));
}
public ThreadPoolExecutor getPublishDataExecutor(a) {
returnpublishDataExecutor; }}Copy the code
ExecutorManager beans are as follows:
@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
return new ExecutorManager(sessionServerConfig);
}
Copy the code
3.7.2 Processor
A Processor is a task definition that encapsulates a task.
public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {
@Override
public ProcessingResult process(SessionTask task) {
try {
task.execute();
return ProcessingResult.Success;
} catch (Throwable throwable) {
if (task instanceof Retryable) {
Retryable retryAbleTask = (Retryable) task;
if (retryAbleTask.checkRetryTimes()) {
returnProcessingResult.TransientError; }}returnProcessingResult.PermanentError; }}@Override
public ProcessingResult process(List<SessionTask> tasks) {
return null; }}Copy the code
3.7.3 business Task
Call dataNodeService. Register (publisher) from execute of PublishDataTask.
public class PublishDataTask extends AbstractSessionTask {
private final DataNodeService dataNodeService;
private Publisher publisher;
public PublishDataTask(DataNodeService dataNodeService) {
this.dataNodeService = dataNodeService;
}
@Override
public void execute(a) {
dataNodeService.register(publisher);
}
@Override
public void setTaskEvent(TaskEvent taskEvent) {
//taskId create from event
if(taskEvent.getTaskId() ! =null) {
setTaskId(taskEvent.getTaskId());
}
Object obj = taskEvent.getEventObj();
if (obj instanceof Publisher) {
this.publisher = (Publisher) obj; }}}Copy the code
Specific as follows
+-------------------------------------------------+
| DefaultTaskListenerManager |
| |
| |
| Multimap<TaskType, TaskListener> taskListeners |
| |
+----------------------+--------------------------+
|
|
PUBLISH_DATA_TASK |
|
v
+------------+--------------+
| PublishDataTaskListener |
+------------+--------------+
|
setTaskEvent |
|
v
+--------+--------+
| PublishDataTask |
+-----------------+
Copy the code
3.8 Forwarding service Information
After the listener decoupling, the PublishDataTask calls the dataNodeService. Register (publisher), which then forwards the service information to the Data Server.
Find the corresponding DataServer based on the consistent Hash of the dataInfoId and send the PublisherRegister to the DataServer.
The DataNodeServiceImpl register function is called to forward the request to the Data Server.
public class DataNodeServiceImpl implements DataNodeService {
@Autowired
private NodeExchanger dataNodeExchanger;
@Autowired
private NodeManager dataNodeManager;
@Autowired
private SessionServerConfig sessionServerConfig;
private AsyncHashedWheelTimer asyncHashedWheelTimer;
}
Copy the code
As you can see, the PublishDataRequest is set up and then sent to the Data Server via Bolt Client.
@Override
public void register(final Publisher publisher) {
String bizName = "PublishData";
Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
try {
sendRequest(bizName, request);
} catch(RequestException e) { doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(), sessionServerConfig.getPublishDataTaskRetryFirstDelay(), sessionServerConfig.getPublishDataTaskRetryIncrementDelay()); }}private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
Response response = dataNodeExchanger.request(request);
Object result = response.getResult();
CommonResponse commonResponse = (CommonResponse) result;
return commonResponse;
}
Copy the code
As follows:
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | | | Multimap<TaskType, TaskListener> taskListeners | | | +----------------------+--------------------------+ | PUBLISH_DATA_TASK | v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | v +--------+--------+ | PublishDataTask | +--------+--------+ register | | +----------v----------+ | DataNodeServiceImpl | +----------+----------+ PublishDataRequest | v +----------+----------+ Client.sendSync +------------+ | DataNodeExchanger +------------------> | Data Server| +---------------------+ PublishDataRequest +------------+Copy the code
How do I know which Data Sever to send? DataNodeExchanger are:
@Override
public Response request(Request request) throws RequestException {
Response response;
URL url = request.getRequestUrl();
try {
Client sessionClient = getClient(url);
finalObject result = sessionClient .sendSync(url, request.getRequestBody(), request.getTimeout() ! =null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());
response = () -> result;
}
return response;
}
Copy the code
So he went to Datanodesert Impl
private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
return new Request<PublishDataRequest>() {
private AtomicInteger retryTimes = new AtomicInteger();
@Override
public PublishDataRequest getRequestBody(a) {
PublishDataRequest publishDataRequest = new PublishDataRequest();
publishDataRequest.setPublisher(publisher);
publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
.getSessionProcessId());
return publishDataRequest;
}
@Override
public URL getRequestUrl(a) {
return getUrl(publisher.getDataInfoId());
}
@Override
public AtomicInteger getRetryTimes(a) {
returnretryTimes; }}; }private URL getUrl(String dataInfoId) {
Node dataNode = dataNodeManager.getNode(dataInfoId);
//meta push data node has not port
String dataIp = dataNode.getNodeUrl().getIpAddress();
return new URL(dataIp, sessionServerConfig.getDataServerPort());
}
Copy the code
In the DataNodeManager there are:
@Override
public DataNode getNode(String dataInfoId) {
DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
return dataNode;
}
Copy the code
The hash is calculated using the dataInfoId, and the corresponding DataNode is obtained from the DataNodeManager to obtain its URL.
Thus, the figure above expands to:
+-------------------------------------------------+
| DefaultTaskListenerManager |
| |
| Multimap<TaskType, TaskListener> taskListeners |
| |
+----------------------+--------------------------+
|
PUBLISH_DATA_TASK | 1
v
+------------+--------------+
| PublishDataTaskListener |
+------------+--------------+
|
setTaskEvent | 2
v
+--------+--------+ 4 +---------------+
| PublishDataTask | +------> |DataNodeManager|
+--------+--------+ | +---------------+
register | 3 | consistentHash|
| | | 5
+----------v----------+---+ v
| DataNodeServiceImpl | 6 +-----+----+
+----------+----------+ <------------+ DataNode |
PublishDataRequest | 7 url +----------+
v
+----------+----------+
| DataNodeExchanger |
+----------+----------+
|
Client.sendSync | PublishDataRequest
|
v 8
+-----+------+
| Data Server|
+------------+
Copy the code
0 x04 summary
Review the internal flow of service data through a service registration process.
- Client invokes publisher.register to register the service with SessionServer.
- SessionServer receives the PublisherRegister data and writes it to the memory. SessionServer stores the Client data into the memory, which can be used to check the DataServer periodically. Then search for the corresponding DataServer based on the consistent Hash of the dataInfoId and send the PublisherRegister to the DataServer.
- DataServer receives PublisherRegister data and first writes it to memory. DataServer aggregates all publisherRegisters in the dataInfoId dimension. At the same time, DataServer notifies all session Servers of the dataInfoId and version.
- At the same time, asynchronously, DataServer incrementally synchronizes data to other replicas in the dataInfoId dimension. DataServer stores multiple copies (three by default) of each shard based on consistent Hash shards.
- After receiving the change event notification, the SessionServer compares the version of dataInfoId stored in the SessionServer memory. If the version is smaller than that sent by DataServer, It actively fetkes the complete data of the dataInfoId from DataServer, that is, it contains all the PublisherRegister lists specific to that dataInfoId.
- Finally, the SessionServer pushes the data to the corresponding Client, and the Client receives the latest service list data after the service registration.
Due to space constraints, this article discusses the first two points, and there will be a follow-up article on the other points.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
How does ant Financial Service registry realize the smooth scaling of DataServer
Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization
The service registry Session storage policy | SOFARegistry parsing
Introduction to the Registry – SOFARegistry architecture for Massive Data
Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing
Ant Financial open source communication framework SOFABolt analysis of connection management analysis
Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework
SOFABolt protocol framework analysis of Ant Financial open source communication framework
Ant gold uniform service registry data consistency analysis | SOFARegistry parsing
Ant communication framework practice
Sofa – Bolt remote call
Sofa – bolt to study
SOFABolt Design Summary – Elegant and simple design approach
SofaBolt source code analysis – Service startup to message processing
SOFABolt source code analysis
SOFABolt source code analysis 9-userProcessor custom processor design
SOFARegistry introduction
SOFABolt source code analysis of the design of the 13-Connection event processing mechanism