0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This is the thirteenth article that introduces service rollout from the SessionServer perspective.

This article mainly introduces the business, while sorting out the logic, design and patterns. Because there are so many modules involved in the registration process, this article only focuses on the Session Server part of the registration process.

0x01 Service Domain

1.1 Application Scenarios

The offline process of a service is when the service performs regular register and offline operations through code calls, regardless of offline scenarios due to unexpected circumstances such as service downtime.

1.1.1 Service Publishing

In a typical “service addressing for RPC calls” application scenario, the service provider publishes the service through the following two steps:

  1. Sign up to SOFARegistry as Publisher;
  2. Publish to SOFARegistry the data that needs to be published (typically IP addresses, ports, invocation methods, and so on);

Correspondingly, the caller of the service implements the service invocation through the following steps:

  1. Sign up and register yourself as Subscriber to SOFARegistry;
  2. Subscribe to receive service data from SOFARegistry.

1.1.2 Necessity of SessionServer

In SOFARegistry, all clients register and subscribe to data, perform a consistent Hash based on the dataInfoId, calculate which DataServer to access, and then establish a long connection with that DataServer.

Since each Client typically registers and subscribes to a large amount of dataInfoId data, we can expect each Client to connect to several dataserVers. The problems with this architecture are: The number of connections to each DataServer increases as the number of clients increases. In extreme cases, each Client needs to be connected to each DataServer. Therefore, expanding a DataServer does not linearly allocate the number of connections to each Client.

Therefore, it is important to design a connection broker layer for the DataServer layer, so SOFARegistry has the SessionServer layer. As the number of clients increases, you can expand the number of session Servers to solve the connection bottleneck of a single server.

1.2 points

Since SessionServer is an intermediate layer, it seems to be relatively simple. On the surface, it is accept and forward.

But in fact, in a large system, how to achieve module segmentation and decoupling logically and physically is very necessary.

1.3 Ali Plan

Let’s focus on the registration part of the Ali scheme.

1.3.1 Registration process

The offline process of the service refers to the normal registration (publisher.unregister) and offline (publisher.unregister) of the service through code invocation, regardless of the offline caused by unexpected circumstances such as service downtime. The figure above roughly shows the internal flow of service data through “a service registration process”.

  1. Client invokes publisher.register to register the service with SessionServer.
  2. SessionServer receives the PublisherRegister data and writes it to the memory. SessionServer stores the Client data into the memory, which can be used to check the DataServer periodically. Then search for the corresponding DataServer based on the consistent Hash of the dataInfoId and send the PublisherRegister to the DataServer.
  3. DataServer receives PublisherRegister data and first writes it to memory. DataServer aggregates all publisherRegisters in the dataInfoId dimension. At the same time, DataServer notifies all session Servers of the dataInfoId and version.
  4. At the same time, asynchronously, DataServer incrementally synchronizes data to other replicas in the dataInfoId dimension. DataServer stores multiple copies (three by default) of each shard based on consistent Hash shards.
  5. After receiving the change event notification, the SessionServer compares the version of dataInfoId stored in the SessionServer memory. If the version is smaller than that sent by DataServer, It actively fetkes the complete data of the dataInfoId from DataServer, that is, it contains all the PublisherRegister lists specific to that dataInfoId.
  6. Finally, the SessionServer pushes the data to the corresponding Client, and the Client receives the latest service list data after the service registration.

Due to space constraints, this article discusses the first two points.

1.3.2 graphic

The following figure shows the code flow for Publisher registration

This process is also handled by handler-task & strategy-listener. The processing process of tasks in the code is basically the same as the subscription process.

0x02 Client SDK

PublisherRegistration is the interface of Client. The key code for publishing data is as follows:

Construct the publisher registry
PublisherRegistration registration = new PublisherRegistration("Com. Alipay. The test. The demo. Service: 1.0 @ DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");

// Register the registry with the client and publish the data
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200? xx=yy");

// You can republish data using the publisher model if you want to overwrite the last published data
publisher.republish("10.10.1.1:12200? xx=zz");
Copy the code

The key to publishing data is to construct a PublisherRegistration, which contains three properties:

The property name Attribute types describe
dataId String DataId. The value must be the same when publishing subscriptions. The unique dataId consists of dataId + group + instanceId.
group String The same value must be used when publishing subscriptions. The unique identifier of data consists of dataId + group + instanceId. The default value is DEFAULT_GROUP.
appName String Application appName.

0x03 Session server

The process comes to the Session Server.

3.1 the Bean

First, you can start with Beans.

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers(a) {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(publisherHandler());
    list.add(subscriberHandler());
    list.add(watcherHandler());
    list.add(clientNodeConnectionHandler());
    list.add(cancelAddressRequestHandler());
    list.add(syncConfigHandler());
    return list;
}
Copy the code

ServerHandlers are a combination of response functions for Bolt Server.

@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry(a) {
    return new SessionRegistry();
}
Copy the code

From the Bean perspective, the current logic is shown in the figure, with a decoupling Strategy:

Beans


+-----------------------------------+
| Bolt Server(in openSessionServer)| +---------------------------------+ | | +-> | DefaultPublisherHandlerStrategy | | +----------------------+ | | +---------+-----------------------+ | | serverHandlers | | | | | | | | | | | | +------------------+ | | | | | | | PublisherHandle+----------------+ v | | | | | | +-------+-------+ | | | watcherHandler | | | |SessionRegistry| | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | |... | | | | | +------------------+ | | | +----------------------+ | +-----------------------------------+Copy the code

Generally, service publishers and Session Servers should be in a Data Center, which is the singleton concept practiced by Ali et al.

3.2 the entrance

PublisherHandler is the Session Server interface to the Client and the Bolt Server response function.

public class PublisherHandler extends AbstractServerHandler {
    @Autowired
    private ExecutorManager          executorManager;

    @Autowired
    private PublisherHandlerStrategy publisherHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) throws RemotingException {

        RegisterResponse result = new RegisterResponse();
        PublisherRegister publisherRegister = (PublisherRegister) message;
        publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
        return result;
    }
Copy the code

The logic is shown below:

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer)| | | | | | +----------------------+ | + | | serverHandlers | | | | | | +--------+ PublisherRegister | | +------------------+ | | | Client +---------------------------> PublisherHandler | | | +--------+ 1 | | | | | | + | | | . | | | | | | +------------------+ | | | | +----------------------+ | | +-----------------------------------+ |Copy the code

3.3 strategy

On the whole, handler-task & strategy-listener are used to process.

What is the Strategy Pattern?

In the process of software development, there are often many algorithms or strategies to achieve a certain function. We can choose different algorithms or strategies to complete the function according to the different environment or conditions. If these algorithms and strategies of abstracting, provides a unified interface, different algorithms have different implementation classes or strategy, so that in the client program can by injecting different implementation object or strategy to implement the algorithm of dynamic replacement, scalability and maintainability of the patterns are higher, this is the strategy pattern.

Definition of Strategy Pattern

  • Strategy pattern: A family of algorithms is defined, packaged separately so that they can be replaced with each other. This pattern makes changes to the algorithm independent of the customers using the algorithm.

  • Simple to understand: defines a series of algorithms. Each algorithm is encapsulated. The algorithms can be replaced with each other. The change of the algorithm does not affect the customers who use the algorithm. It belongs to the behavioral pattern.

In the Strategy Pattern, the behavior of a class or its algorithm can be changed at run time. This type of design pattern is behavioral.

In the policy pattern, we create objects that represent various policies and a context object whose behavior changes as the policy object changes. The policy object changes the execution algorithm of the context object.

3.3.1 Directory Structure

From the perspective of the directory structure, there are many definitions and implementations of Strategy, which should be different strategies according to different situations. Some of them are currently reserved interfaces.

Com/alipay/sofa/registry/server/session/strategy ├ ─ ─ DataChangeRequestHandlerStrategy. Java ├ ─ ─ PublisherHandlerStrategy. Java ├ ─ ─ ReceivedConfigDataPushTaskStrategy. Java ├ ─ ─ ReceivedDataMultiPushTaskStrategy. Java ├ ─ ─ SessionRegistryStrategy. Java ├ ─ ─ SubscriberHandlerStrategy. Java ├ ─ ─ SubscriberMultiFetchTaskStrategy. Java ├ ─ ─ SubscriberRegisterFetchTaskStrategy. Java ├ ─ ─ SyncConfigHandlerStrategy. Java ├ ─ ─ TaskMergeProcessorStrategy. Java ├ ─ ─ WatcherHandlerStrategy. Java └ ─ ─ impl ├ ─ ─ DefaultDataChangeRequestHandlerStrategy. Java ├ ─ ─ DefaultPublisherHandlerStrategy. Java ├ ─ ─ DefaultPushTaskMergeProcessor. Java ├ ─ ─ DefaultReceivedConfigDataPushTaskStrategy. Java ├ ─ ─ DefaultReceivedDataMultiPushTaskStrategy. Java ├ ─ ─ DefaultSessionRegistryStrategy. Java ├ ─ ─ DefaultSubscriberHandlerStrategy. Java ├ ─ ─ DefaultSubscriberMultiFetchTaskStrategy. Java ├ ─ ─ DefaultSubscriberRegisterFetchTaskStrategy. Java ├ ─ ─ DefaultSyncConfigHandlerStrategy. Java └ ─ ─ DefaultWatcherHandlerStrategy. JavaCopy the code

3.3.2 rainfall distribution on 10-12 DefaultPublisherHandlerStrategy

As far as the code is concerned, it’s just setup, sort, forward. That is, set the default information for Publisher and register or unRegister depending on the event type.

public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
    @Autowired
    private Registry            sessionRegistry;

    @Override
    public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
        try {
            String ip = channel.getRemoteAddress().getAddress().getHostAddress();
            int port = channel.getRemoteAddress().getPort();
            publisherRegister.setIp(ip);
            publisherRegister.setPort(port);

            if (StringUtils.isBlank(publisherRegister.getZone())) {
                publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
            }

            if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
                publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
            }

            Publisher publisher = PublisherConverter.convert(publisherRegister);
            publisher.setProcessId(ip + ":" + port);
            publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
            if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.register(publisher);
            } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.unRegister(publisher);
            }
            registerResponse.setSuccess(true);
            registerResponse.setVersion(publisher.getVersion());
            registerResponse.setRegistId(publisherRegister.getRegistId());
            registerResponse.setMessage("Publisher register success!"); }}}Copy the code

The following figure shows the logic

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |                          |                               |
               +              |    | |  watcherHandler  | |       |                          +-------------------------------+
               |              |    | |                  | |       |
               |              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
                              +-----------------------------------+

Copy the code

Mobile phone as shown in figure

3.4 Core logical Components

In the previous code, the policy calls sessionRegistry.register(Publisher), the registration function.

As you can see from the internal member variables of SessionRegistry, this is where the core logic of the Session Server lies, as opposed to the business engine.

It mainly provides the following functions:

  • Register (StoreData Data) : Registers new Publisher or subscriber data

  • Cancel (List connectIds) : Cancel publisher or subscriber data

  • Remove (List connectIds) : Removes publisher or subscriber data

  • UnRegister (StoreData data) : unRegister publisher or subscriber data

  • .

The specific member variables are as follows:

public class SessionRegistry implements Registry {

    /** * store subscribers */
    @Autowired
    private Interests                 sessionInterests;

    /** * store watchers */
    @Autowired
    private Watchers                  sessionWatchers;

    /** * store publishers */
    @Autowired
    private DataStore                 sessionDataStore;

    /** * transfer data to DataNode */
    @Autowired
    private DataNodeService           dataNodeService;

    /** * trigger task com.alipay.sofa.registry.server.meta.listener process */
    @Autowired
    private TaskListenerManager       taskListenerManager;

    /** * calculate data node url */
    @Autowired
    private NodeManager               dataNodeManager;

    @Autowired
    private SessionServerConfig       sessionServerConfig;

    @Autowired
    private Exchange                  boltExchange;

    @Autowired
    private SessionRegistryStrategy   sessionRegistryStrategy;

    @Autowired
    private WrapperInterceptorManager wrapperInterceptorManager;

    @Autowired
    private DataIdMatchStrategy       dataIdMatchStrategy;

    @Autowired
    private RenewService              renewService;

    @Autowired
    private WriteDataAcceptor         writeDataAcceptor;

    private volatile boolean          enableDataRenewSnapshot = true;
}
Copy the code

The register function generates a WriteDataRequest and calls writeDataAcceptor.accept to complete the process.

@Override
public void register(StoreData storeData) {

    WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
            new Wrapper<StoreData, Boolean>() {
                @Override
                public Boolean call(a) {

                    switch (storeData.getDataType()) {
                        case PUBLISHER:
                            Publisher publisher = (Publisher) storeData;

                            sessionDataStore.add(publisher);

                            // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
                            // are handed over to WriteDataAcceptor
                            writeDataAcceptor.accept(new WriteDataRequest() {
                                @Override
                                public Object getRequestBody(a) {
                                    return publisher;
                                }

                                @Override
                                public WriteDataRequestType getRequestType(a) {
                                    return WriteDataRequestType.PUBLISHER;
                                }

                                @Override
                                public String getConnectId(a) {
                                    return publisher.getSourceAddress().getAddressString();
                                }

                                @Override
                                public String getDataServerIP(a) {
                                    Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
                                    returndataNode.getNodeUrl().getIpAddress(); }}); sessionRegistryStrategy.afterPublisherRegister(publisher);break;
                        case SUBSCRIBER:
                            Subscriber subscriber = (Subscriber) storeData;

                            sessionInterests.add(subscriber);

                            sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                            break;
                        case WATCHER:
                            Watcher watcher = (Watcher) storeData;

                            sessionWatchers.add(watcher);

                            sessionRegistryStrategy.afterWatcherRegister(watcher);
                            break;
                        default:
                            break;
                    }
                    return null;
                }

                @Override
                public Supplier<StoreData> getParameterSupplier(a) {
                    return () -> storeData;
                }

            }, wrapperInterceptorManager);

    try {
        wrapperInvocation.proceed();
    } catch (Exception e) {
        throw new RuntimeException("Proceed register error!", e); }}Copy the code

The current logic is as follows:

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                                    3  | register
               |              |    +----------------------+       |                                       |
                              +-----------------------------------+                                       |
                                                                                                          v
                                                                                      +-------------------+-------------------+
                                                                                      |           SessionRegistry             |
                                                                                      |                                       |
                                                                                      |                                       |
                                                                                      |  storeData.getDataType() == PUBLISHER |
                                                                                      +---------------------------------------+

Copy the code

The mobile phone is as follows:

3.4.1 track SessionRegistryStrategy

Here is another strategy, there is only one implementation at present, it should be to make a replacement in the future, the current function is simply leaving the interface empty.

We can see that Ali is trying to decouple everything.

public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
    @Override
    public void afterPublisherRegister(Publisher publisher) {}}Copy the code

3.4.2 Storage Module

In the previous registration process:

sessionDataStore.add(publisher);
Copy the code

This is the data storage module of Session, which is also the core of the system. Stores all Publishers registered in this Session.

public class SessionDataStore implements DataStore {
    /** * publisher store */
    private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry      = new ConcurrentHashMap<>();

    /*** index */
    private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>>  connectIndex  = new ConcurrentHashMap<>();
}
Copy the code

Two storage methods are recorded here, which are stored according to dataInfoId and connectId respectively.

When stored, the version number and timestamp are compared.

@Override
public void add(Publisher publisher) {
    Publisher.internPublisher(publisher);

    write.lock();
    try {
        Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());

        if (publishers == null) {
            ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
            publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
            if (publishers == null) {
                publishers = newmap;
            }
        }

        Publisher existingPublisher = publishers.get(publisher.getRegisterId());

        if(existingPublisher ! =null) {

            if(existingPublisher.getVersion() ! =null) {
                long oldVersion = existingPublisher.getVersion();
                Long newVersion = publisher.getVersion();
                if (newVersion == null) {
                    return;
                } else if (oldVersion > newVersion) {
                    return;
                } else if (oldVersion == newVersion) {
                    Long newTime = publisher.getRegisterTimestamp();
                    long oldTime = existingPublisher.getRegisterTimestamp();
                    if (newTime == null) {
                        return;
                    }
                    if (oldTime > newTime) {
                        return;
                    }
                }
            }
        }
        publishers.put(publisher.getRegisterId(), publisher);
        addToConnectIndex(publisher);

    } finally{ write.unlock(); }}Copy the code

3.5 Acceptor module

After the SessionServer itself is stored, the next step is to notify the Data Server.

The goal here is to forward the received registrations to DataServer, but there is a bit of a complexity to the coupling.

3.5.1 track of overall Acceptor

WriteDataAcceptorImpl handles writes to specific Publisher. First you need to unify the write requests.

Use private Map

writeDataProcessors = new ConcurrentHashMap(); To store all write requests uniformly.
,>

Write requests for different connections are processed based on different connections.

Details are as follows:

public class WriteDataAcceptorImpl implements WriteDataAcceptor {

    @Autowired
    private TaskListenerManager             taskListenerManager;

    @Autowired
    private SessionServerConfig             sessionServerConfig;

    @Autowired
    private RenewService                    renewService;

    /** * acceptor for all write data request * key:connectId * value:writeRequest processor * */
    private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();

    public void accept(WriteDataRequest request) {
        String connectId = request.getConnectId();
        WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
                key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));

        writeDataProcessor.process(request);
    }
  
    public void remove(String connectId) { writeDataProcessors.remove(connectId); }}Copy the code

The current logic is shown in the following figure

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +-----------------------------------------------------+                    +-------------+-------------------------+
               | |           WriteDataAcceptorImpl                     |  WriteDataRequest  |           SessionRegistry             |
               | |                                                     | <------------------+                                       |
               | |                                                     |                    |                                       |
               | | Map<String, WriteDataProcessor> writeDataProcessors |                    |  storeData.getDataType() == PUBLISHER |
               | |                                                     |                    +---------------------------------------+
               + +-----------------------------------------------------+

Copy the code

Mobile phone as shown in figure

3.5.2 Specific handling

Having consolidated all the requests earlier, we now need to continue processing for each write to the connection.

The key here is the following data structure, that is, each connection’s write request is placed in the queue. The purpose of using Queue is to cache data internally and uniformly to DataServer.

ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue
Copy the code

Each request is processed differently.

For our example, the processing is as follows:

case PUBLISHER: {
		doPublishAsync(request);
}
Copy the code

Finally, a request taskType.publish_datA_task is sent to the taskListenerManager, which is handled by the publishDataTask call to the publishDataTask Listener.

There is a listener decoupling, which we will explain next.

private void doPublishAsync(WriteDataRequest request) {
    sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}

private void sendEvent(Object eventObj, TaskType taskType) {
		TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
		taskListenerManager.sendTaskEvent(taskEvent);
}
Copy the code

The specific code is as follows:

public class WriteDataProcessor {
    private final TaskListenerManager               taskListenerManager;

    private final SessionServerConfig               sessionServerConfig;

    private final RenewService                      renewService;

    private final String                            connectId;

    private Map<String, AtomicLong>                 lastUpdateTimestampMap = new ConcurrentHashMap<>();

    private AtomicBoolean                           writeDataLock          = new AtomicBoolean(
                                                                               false);

    private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue          = new ConcurrentLinkedQueue();

    private AtomicInteger                           acceptorQueueSize      = new AtomicInteger(0);

    public void process(WriteDataRequest request) {
        // record the last update time by pub/unpub
        if (isWriteRequest(request)) {
            refreshUpdateTime(request.getDataServerIP());
        }

        if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
            // snapshot has high priority, so handle directly
            doHandle(request);
        } else {
            // If locked, insert the queue;
            // otherwise, try emptying the queue (to avoid residue) before processing the request.
            if (writeDataLock.get()) {
                addQueue(request);
            } else{ flushQueue(); doHandle(request); }}}private void doHandle(WriteDataRequest request) {
        switch (request.getRequestType()) {
            case PUBLISHER: {
                doPublishAsync(request);
            }
                break;
            case UN_PUBLISHER: {
                doUnPublishAsync(request);
            }
                break;
            case CLIENT_OFF: {
                doClientOffAsync(request);
            }
                break;
            case RENEW_DATUM: {
                if (renewAndSnapshotInSilence(request.getDataServerIP())) {
                    return;
                }
                doRenewAsync(request);
            }
                break;
            case DATUM_SNAPSHOT: {
                if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
                    return;
                }
                halt();
                try {
                    doSnapshotAsync(request);
                } finally{ resume(); }}break;
    }
      
    private void doPublishAsync(WriteDataRequest request) {
        sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
    }
      
    private void sendEvent(Object eventObj, TaskType taskType) {
        TaskEvent taskEvent = newTaskEvent(eventObj, taskType); taskListenerManager.sendTaskEvent(taskEvent); }}Copy the code

As shown in the figure below

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +---------------------------------------------------------+                    +---------+-----------------------------+
               | |           WriteDataAcceptorImpl                         |  WriteDataRequest  |           SessionRegistry             |
               | |                                                         | <------------------+                                       |
               | |                                                         |       4            |   sessionDataStore.add(publisher)     |
               | | Map<connectId , WriteDataProcessor> writeDataProcessors |                    |                                       |
               | |                                                         |                    |  storeData.getDataType() == PUBLISHER |
               | +----------------------+----------------------------------+                    |                                       |
               |                process | 5+---------------------------------------+ | v | +-------------------+---------------------+ +--------------------------+  | | WriteDataProcessor | | PublishDataTaskListener | | | | PUBLISH_DATA_TASK | | | | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> | PublishDataTask | | | |6              +--------------------------+
               +    +-----------------------------------------+

Copy the code

Mobile phone as shown below:

3.6 the Listener decoupling

All of this was logically integrated, and here we have a decoupling.

3.6.1 Decouple the engine

DefaultTaskListenerManager decoupling mechanism is, as you can see, add the listener, when the user calls sendTaskEvent will iterate over all listeners, call the corresponding to the listener.

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners(a) {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for(TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); }}}Copy the code

3.6.2 Listener

PublishDataTaskListener is the corresponding handler, and in its support function, it declares support for PUBLISH_DATA_TASK. This completes decoupling.

public class PublishDataTaskListener implements TaskListener {

    @Autowired
    private DataNodeService dataNodeService;

    @Autowired
    private TaskProcessor   dataNodeSingleTaskProcessor;

    @Autowired
    private ExecutorManager executorManager;

    @Override
    public TaskType support(a) {
        return TaskType.PUBLISH_DATA_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {

        SessionTask publishDataTask = newPublishDataTask(dataNodeService); publishDataTask.setTaskEvent(event); executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask)); }}Copy the code

3.7 the Task scheduling

The Listener is found above, which starts the task executing the business with the following code. But the mechanism behind this needs to be explored.

executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
Copy the code

3.7.1 ExecutorManager

With ExecutorManager, thread pools are uniformly started and shut down. PublishDataExecutor is one of them.

ExecutorManager code excerpts are as follows:

public class ExecutorManager {

    private final ScheduledThreadPoolExecutor scheduler;

    private final ThreadPoolExecutor          publishDataExecutor;

    private static final String               PUBLISH_DATA_EXECUTOR                      = "PublishDataExecutor";

    public ExecutorManager(SessionServerConfig sessionServerConfig) {
      
        publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
                k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
                        sessionServerConfig.getPublishDataExecutorMinPoolSize(),
                        sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
                        sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
                        new NamedThreadFactory("PublishData-executor".true)));
    }
  
		public ThreadPoolExecutor getPublishDataExecutor(a) {
        returnpublishDataExecutor; }}Copy the code

ExecutorManager beans are as follows:

@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
    return new ExecutorManager(sessionServerConfig);
}
Copy the code

3.7.2 Processor

A Processor is a task definition that encapsulates a task.

public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {

    @Override
    public ProcessingResult process(SessionTask task) {
        try {
            task.execute();
            return ProcessingResult.Success;
        } catch (Throwable throwable) {
            if (task instanceof Retryable) {
                Retryable retryAbleTask = (Retryable) task;
                if (retryAbleTask.checkRetryTimes()) {
                    returnProcessingResult.TransientError; }}returnProcessingResult.PermanentError; }}@Override
    public ProcessingResult process(List<SessionTask> tasks) {
        return null; }}Copy the code

3.7.3 business Task

Call dataNodeService. Register (publisher) from execute of PublishDataTask.

public class PublishDataTask extends AbstractSessionTask {

    private final DataNodeService dataNodeService;

    private Publisher             publisher;

    public PublishDataTask(DataNodeService dataNodeService) {
        this.dataNodeService = dataNodeService;
    }

    @Override
    public void execute(a) {
        dataNodeService.register(publisher);
    }

    @Override
    public void setTaskEvent(TaskEvent taskEvent) {
        //taskId create from event
        if(taskEvent.getTaskId() ! =null) {
            setTaskId(taskEvent.getTaskId());
        }

        Object obj = taskEvent.getEventObj();
        if (obj instanceof Publisher) {
            this.publisher = (Publisher) obj; }}}Copy the code

Specific as follows

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
                       |
     PUBLISH_DATA_TASK |
                       |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       |
                       v
              +--------+--------+
              | PublishDataTask |
              +-----------------+

Copy the code

3.8 Forwarding service Information

After the listener decoupling, the PublishDataTask calls the dataNodeService. Register (publisher), which then forwards the service information to the Data Server.

Find the corresponding DataServer based on the consistent Hash of the dataInfoId and send the PublisherRegister to the DataServer.

The DataNodeServiceImpl register function is called to forward the request to the Data Server.

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;

    @Autowired
    private SessionServerConfig   sessionServerConfig;

    private AsyncHashedWheelTimer asyncHashedWheelTimer;
}
Copy the code

As you can see, the PublishDataRequest is set up and then sent to the Data Server via Bolt Client.

@Override
public void register(final Publisher publisher) {
    String bizName = "PublishData";
    Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
    try {
        sendRequest(bizName, request);
    } catch(RequestException e) { doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(), sessionServerConfig.getPublishDataTaskRetryFirstDelay(), sessionServerConfig.getPublishDataTaskRetryIncrementDelay()); }}private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
        Response response = dataNodeExchanger.request(request);
        Object result = response.getResult();
        CommonResponse commonResponse = (CommonResponse) result;
        return commonResponse;
}
Copy the code

As follows:

+-------------------------------------------------+ | DefaultTaskListenerManager | | | | | | Multimap<TaskType, TaskListener> taskListeners | | | +----------------------+--------------------------+ | PUBLISH_DATA_TASK | v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | v +--------+--------+ | PublishDataTask | +--------+--------+ register | | +----------v----------+ | DataNodeServiceImpl |  +----------+----------+ PublishDataRequest | v +----------+----------+ Client.sendSync +------------+ | DataNodeExchanger +------------------> | Data Server| +---------------------+ PublishDataRequest +------------+Copy the code

How do I know which Data Sever to send? DataNodeExchanger are:

@Override
public Response request(Request request) throws RequestException {

    Response response;
    URL url = request.getRequestUrl();
    try {
        Client sessionClient = getClient(url);

        finalObject result = sessionClient .sendSync(url, request.getRequestBody(), request.getTimeout() ! =null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());

        response = () -> result;
    } 

    return response;
}
Copy the code

So he went to Datanodesert Impl

private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
    return new Request<PublishDataRequest>() {
        private AtomicInteger retryTimes = new AtomicInteger();

        @Override
        public PublishDataRequest getRequestBody(a) {
            PublishDataRequest publishDataRequest = new PublishDataRequest();
            publishDataRequest.setPublisher(publisher);
            publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
                .getSessionProcessId());
            return publishDataRequest;
        }

        @Override
        public URL getRequestUrl(a) {
            return getUrl(publisher.getDataInfoId());
        }

        @Override
        public AtomicInteger getRetryTimes(a) {
            returnretryTimes; }}; }private URL getUrl(String dataInfoId) {
        Node dataNode = dataNodeManager.getNode(dataInfoId);
        //meta push data node has not port
        String dataIp = dataNode.getNodeUrl().getIpAddress();
        return new URL(dataIp, sessionServerConfig.getDataServerPort());
}

Copy the code

In the DataNodeManager there are:

@Override
public DataNode getNode(String dataInfoId) {
    DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
    return dataNode;
}
Copy the code

The hash is calculated using the dataInfoId, and the corresponding DataNode is obtained from the DataNodeManager to obtain its URL.

Thus, the figure above expands to:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |  1
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |  2
                       v
              +--------+--------+        4     +---------------+
              | PublishDataTask |     +------> |DataNodeManager|
              +--------+--------+     |        +---------------+
              register |  3           |  consistentHash|
                       |              |                | 5
            +----------v----------+---+                v
            | DataNodeServiceImpl |       6      +-----+----+
            +----------+----------+ <------------+ DataNode |
    PublishDataRequest | 7              url      +----------+
                       v
            +----------+----------+
            |  DataNodeExchanger  |
            +----------+----------+
                       |
       Client.sendSync | PublishDataRequest
                       |
                       v 8
                 +-----+------+
                 | Data Server|
                 +------------+

Copy the code

0 x04 summary

Review the internal flow of service data through a service registration process.

  1. Client invokes publisher.register to register the service with SessionServer.
  2. SessionServer receives the PublisherRegister data and writes it to the memory. SessionServer stores the Client data into the memory, which can be used to check the DataServer periodically. Then search for the corresponding DataServer based on the consistent Hash of the dataInfoId and send the PublisherRegister to the DataServer.
  3. DataServer receives PublisherRegister data and first writes it to memory. DataServer aggregates all publisherRegisters in the dataInfoId dimension. At the same time, DataServer notifies all session Servers of the dataInfoId and version.
  4. At the same time, asynchronously, DataServer incrementally synchronizes data to other replicas in the dataInfoId dimension. DataServer stores multiple copies (three by default) of each shard based on consistent Hash shards.
  5. After receiving the change event notification, the SessionServer compares the version of dataInfoId stored in the SessionServer memory. If the version is smaller than that sent by DataServer, It actively fetkes the complete data of the dataInfoId from DataServer, that is, it contains all the PublisherRegister lists specific to that dataInfoId.
  6. Finally, the SessionServer pushes the data to the corresponding Client, and the Client receives the latest service list data after the service registration.

Due to space constraints, this article discusses the first two points, and there will be a follow-up article on the other points.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

How does ant Financial Service registry realize the smooth scaling of DataServer

Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization

The service registry Session storage policy | SOFARegistry parsing

Introduction to the Registry – SOFARegistry architecture for Massive Data

Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing

Ant Financial open source communication framework SOFABolt analysis of connection management analysis

Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework

SOFABolt protocol framework analysis of Ant Financial open source communication framework

Ant gold uniform service registry data consistency analysis | SOFARegistry parsing

Ant communication framework practice

Sofa – Bolt remote call

Sofa – bolt to study

SOFABolt Design Summary – Elegant and simple design approach

SofaBolt source code analysis – Service startup to message processing

SOFABolt source code analysis

SOFABolt source code analysis 9-userProcessor custom processor design

SOFARegistry introduction

SOFABolt source code analysis of the design of the 13-Connection event processing mechanism