0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This article is the seventeenth article that describes how to handle configuration information for SOFARegistry network operations.

0x01 Business Category

1.1 Configuration Functions

Some system-specific services, for example, need to be set by the console. So the Meta Server provides interfaces to the console. When the Meta Server receives a request from the console, it interacts with the Data Server and Session Server. For example, the Meta Server provides the following interfaces:

@Bean
@ConditionalOnMissingBean
public StopPushDataResource stopPushDataResource(a) {
    return new StopPushDataResource();
}

@Bean
public BlacklistDataResource blacklistDataResource(a) {
    return new BlacklistDataResource();
}

@Bean
public RenewSwitchResource renewSwitchResource(a) {
    return new RenewSwitchResource();
}
Copy the code

The HTTP interface is provided externally because it is normal and basic. However, the Bolt protocol still operates between servers.

1.2 Learning Direction

The derivation is as follows: On the DataServer, how to extract configuration information separately?

0x02 Data structure

2.1 Directory Structure

The DataServer directory contains Handler, service, task, and provideData.

│ ├ ─ ─ metaserver │ │ ├ ─ ─ DefaultMetaServiceImpl. Java │ │ ├ ─ ─ IMetaServerService. Java │ │ ├ ─ ─ MetaServerConnectionFactory. Java │ │ ├ ─ ─ handler │ │ │ ├ ─ ─ NotifyProvideDataChangeHandler. Java │ │ │ ├ ─ ─ ServerChangeHandler. Java │ │ │ └ ─ ─ StatusConfirmHandler. Java │ │ ├ ─ ─ provideData │ │ │ ├ ─ ─ ProvideDataProcessor. Java │ │ │ ├ ─ ─ ProvideDataProcessorManager. Java │ │ │ └ ─ ─ processor │ │ │ └ ─ ─ DatumExpireProvideDataProcessor. Java │ │ └ ─ ─ a taskCopy the code

2.2 Data structure definition

The configuration data structure is as follows:

ProvideData is the external interactive interface, which contains the version number and service identifier dataInfoId.

public class ProvideData implements Serializable {
    private ServerDataBox provideData;
    private String        dataInfoId;
    private Long          version;
}
Copy the code

ServerDataBox is a concrete service, as defined below

public class ServerDataBox implements Serializable {
    /** Null for locally instantiated, otherwise for internalized */
    private byte[]            bytes;
    /** Only available if bytes ! = null */
    private int               serialization;
    /** Actual object, lazy deserialized */
    private Object            object;
}  
Copy the code

Regarding ServerDataBox, there is currently only one use of Data Server. Boolean type is used, that is, control switch configuration.

public void changeDataProcess(ProvideData provideData) {
    boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
        .getObject());
    datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
Copy the code

0x03 Meta Server Internal Flow

Here in order to get through the process, the need to mention meta server inside and metaServerService. FetchData (dataInfoId) related to the process.

For decoupling purposes, Meta Server splits certain business functions into four layers, with the basic logic:

Http Resource ———>  TaskListener ———>   Task  ————>   Service   
Copy the code

Firstly, the flow chart is given as follows, and the process will be introduced step by step below:

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+                                                      7
+-------+   1   |                        |                                                                       +---------------------------------------+
| Admin | +---> | +--------------------- |                                                                       |    Data Server                        |
+-------+       | |fireDataChangeNotify| |                                                                       |                                       |
                | +--------------------+ |                                                     6                 | +-----------------------------------+ |
                +------------------------+                                                                       | |    metaClientHandlers             | |
                    |                                     +---------------------+    dataNodeExchanger.request   | | +-------------------------------+ | |
                    |        3                            | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | |
                    |                                     +----------+----------+     NotifyProvideDataChange    | | +-------------------------------+ | |
                    |   NotifyProvideDataChange                      ^                                           | |                                   | |
                    |                                                |                                           | +-----------------------------------+ |
                    |                                             5| notifyProvideDataChange +---------------------------------------+ v | +---------+-----------------------------------+ | | DefaultTaskListenerManager | | | | +----+----------------------------+ | +-----------------------------------------+  |4     |                                 |
          | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
          | |                                         | |       |                                 |
          | | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+
          | |                                         | |
          | | dataNodeChangePushTaskListener          | |
          | |                                         | |
          | | sessionNodeChangePushTaskListener       | |
          | +-----------------------------------------+ |
          +---------------------------------------------+

Copy the code

The mobile phone icon is as follows:

3.1 Admin Request Response

As mentioned above, Meta Server provides some control interfaces to Admin over Http. Let’s use BlacklistDataResource as an example.

As you can see, the blacklistPush function is stored in the persistenceDataDBService, and fireDataChangeNotify sends NotifyProvideDataChange indirectly.

@Path("blacklist")
public class BlacklistDataResource {

    @RaftReference
    private DBService           persistenceDataDBService;

    @Autowired
    private TaskListenerManager taskListenerManager;

    /** * update blacklist * e.g. curl -d '{" FORBIDDEN_PUB ": {" IP_FULL" : [" 1.1.1.1 ", "10.15.233.150"]}, "FORBIDDEN_SUB_BY_PREFIX" : {" IP_FULL: "[]" 1.1.1.1 "}}' - H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update */
    @POST
    @Path("update")
    @Produces(MediaType.APPLICATION_JSON)
    public Result blacklistPush(String config) {
        PersistenceData persistenceData = createDataInfo();
        persistenceData.setData(config);
        boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID,
                persistenceData);

        fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID,
            DataOperator.UPDATE);

        Result result = new Result();
        result.setSuccess(true);
        return result;
    }

    private PersistenceData createDataInfo(a) {
        DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID);
        PersistenceData persistenceData = new PersistenceData();
        persistenceData.setDataId(dataInfo.getDataId());
        persistenceData.setGroup(dataInfo.getDataType());
        persistenceData.setInstanceId(dataInfo.getInstanceId());
        persistenceData.setVersion(System.currentTimeMillis());
        return persistenceData;
    }

    private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) {

        NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId,
            version, dataOperator);

        TaskEvent taskEvent = newTaskEvent(notifyProvideDataChange, TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK); taskListenerManager.sendTaskEvent(taskEvent); }}Copy the code

This corresponds to the figure above:

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+

Copy the code

3.2 DBService

As you can see, DBService is also Raft based, indicating that consistency is maintained within the MetaServer cluster itself.

@RaftReference
private DBService           persistenceDataDBService;
Copy the code

The PersistenceDataDBService class is defined as follows:

@RaftService
public class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService {

    private ConcurrentHashMap<String, Object> serviceMap        = new ConcurrentHashMap<>();

    @Override
    public boolean put(String key, Object value) {
        Object ret = serviceMap.put(key, value);
        return true;
    }

    @Override
    public DBResponse get(String key) {
        Object ret = serviceMap.get(key);
        returnret ! =null ? DBResponse.ok(ret).build() : DBResponse.notfound().build();
    }

    @Override
    public boolean update(String key, Object value) {
        Object ret = serviceMap.put(key, value);
        return true;
    }
      
    @Override
    public Set<String> getSnapshotFileNames(a) {
        if(! snapShotFileNames.isEmpty()) {return snapShotFileNames;
        }
        snapShotFileNames.add(this.getClass().getSimpleName());
        returnsnapShotFileNames; }}Copy the code

As you can see, ConcurrentHashMap is used for storage and Raft uses a file system for snapshot backup.

3.3 the Bean

As mentioned earlier, for decoupling, Meta Server encapsulates functions such as message processing and forwarding as TaskListeners, which are logically executed by the TaskListenerManager. Here, take the related functions of ProvideData as an example, and the corresponding Bean is.

@Configuration
public static class MetaServerTaskConfiguration {...@Bean
    public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) {
        TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener(
            sessionNodeSingleTaskProcessor());
        taskListenerManager.addTaskListener(taskListener);
        return taskListener;
    }

    @Bean
    public TaskListenerManager taskListenerManager(a) {
        return newDefaultTaskListenerManager(); }}Copy the code

3.4 the Listener

The Listener execution engine iterates through the Listener list to process the Listener. If a Listener can process the Listener, the Listener is executed.

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners(a) {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for(TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); }}}Copy the code

The corresponding service Listener is as follows:

public class PersistenceDataChangeNotifyTaskListener implements TaskListener {

    @Autowired
    private MetaServerConfig                       metaServerConfig;

    private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher;

    public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) {
        singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher(
            TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor);
    }

    @Override
    public TaskType support(a) {
        return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {
        MetaServerTask persistenceDataChangeNotifyTask = newPersistenceDataChangeNotifyTask( metaServerConfig); persistenceDataChangeNotifyTask.setTaskEvent(event); singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(), persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime()); }}Copy the code

This corresponds to the following:

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
                    |
                    |        3
                    |
                    |   NotifyProvideDataChange
                    |
                    |
                    v
          +---------+-----------------------------------+
          |       DefaultTaskListenerManager            |
          |                                             |
          | +-----------------------------------------+ |
          | | persistenceDataChangeNotifyTaskListener | |
          | |                                         | |
          | | receiveStatusConfirmNotifyTaskListener  | |
          | |                                         | |
          | | dataNodeChangePushTaskListener          | |
          | |                                         | |
          | | sessionNodeChangePushTaskListener       | |
          | +-----------------------------------------+ |
          +---------------------------------------------+

Copy the code

3.5 the Task

The Listener calls the Task.

Different services are called according to different notetypes:

public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask {

    private final SessionNodeService sessionNodeService;

    private final DataNodeService    dataNodeService;

    final private MetaServerConfig   metaServerConfig;

    private NotifyProvideDataChange  notifyProvideDataChange;

    @Override
    public void execute(a) {
        Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes();
        if (nodeTypes.contains(NodeType.DATA)) {
            dataNodeService.notifyProvideDataChange(notifyProvideDataChange);
        }
        if(nodeTypes.contains(NodeType.SESSION)) { sessionNodeService.notifyProvideDataChange(notifyProvideDataChange); }}@Override
    public void setTaskEvent(TaskEvent taskEvent) {
        Object obj = taskEvent.getEventObj();
        if (obj instanceof NotifyProvideDataChange) {
            this.notifyProvideDataChange = (NotifyProvideDataChange) obj; }}}Copy the code

Here is the corresponding

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
                    |
                    |        3
                    |
                    |   NotifyProvideDataChange
                    |
                    |
                    v
+-------------------+-------------------------+
|       DefaultTaskListenerManager            |
|                                             |       +---------------------------------+
| +-----------------------------------------+ | 4     |                                 |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| |                                         | |       |                                 |
| | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+
| |                                         | |
| | dataNodeChangePushTaskListener          | |
| |                                         | |
| | sessionNodeChangePushTaskListener       | |
| +-----------------------------------------+ |
+---------------------------------------------+

Copy the code

3.6 services

A task invokes a service to perform a specific service, such as the following, which sends a push to the DataServer or SessionServer.

public class DataNodeServiceImpl implements DataNodeService {

    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private StoreService          dataStoreService;

    @Autowired
    private AbstractServerHandler dataConnectionHandler;

    @Override
    public NodeType getNodeType(a) {
        return NodeType.DATA;
    }

    @Override
    public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) {

        NodeConnectManager nodeConnectManager = getNodeConnectManager();
        Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);

        // add register confirm
        StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
        Map<String, DataNode> dataNodes = storeService.getNodes();

        for (InetSocketAddress connection : connections) {

            if(! dataNodes.keySet().contains(connection.getAddress().getHostAddress())) {continue;
            }

            try {
                Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() {

                    @Override
                    public NotifyProvideDataChange getRequestBody(a) {
                        return notifyProvideDataChange;
                    }

                    @Override
                    public URL getRequestUrl(a) {
                        return newURL(connection); }}; dataNodeExchanger.request(request); }}}}Copy the code

Here is the corresponding

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
          |                                     +---------------------+
          |        3                            | DataNodeServiceImpl |
          |                                     +----------+----------+
          |   NotifyProvideDataChange                      ^
          |                                                |
          |                                             5  | notifyProvideDataChange
          v                                                |
+---------+-----------------------------------+            |
|       DefaultTaskListenerManager            |            |
|                                             |       +----+----------------------------+
| +-----------------------------------------+ | 4     |                                 |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| |                                         | |       |                                 |
| | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+
| |                                         | |
| | dataNodeChangePushTaskListener          | |
| |                                         | |
| | sessionNodeChangePushTaskListener       | |
| +-----------------------------------------+ |
+---------------------------------------------+

Copy the code

After it’s sent, it’s

                +------------------------+
                |                        |   2    +-------------------------+
                | BlacklistDataResource  +------>-+PersistenceDataDBService |
                |                        | update +-------------------------+
+-------+   1   |                        |
| Admin | +---> | +--------------------+ |
+-------+       | |fireDataChangeNotify| |
                | +--------------------+ |
                +------------------------+
                    |       3
                    |   NotifyProvideDataChange
                    v
+-------------------+-------------------------+
|       DefaultTaskListenerManager            |
|                                             |       +---------------------------------+
| +-----------------------------------------+ | 4     |                                 |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | receiveStatusConfirmNotifyTaskListener  | |       |                                 |
| | dataNodeChangePushTaskListener          | |       +----+----------------------------+
| | sessionNodeChangePushTaskListener       | |            |
| +-----------------------------------------+ |            |
+---------------------------------------------+         5  | notifyProvideDataChange
                                                           |
               +-------------------------------------------+
               |
               v
 +---v---------+-------+
 | DataNodeServiceImpl |                         +---------------------------------------+
 +-------------+-------+                         |    Data Server                      7 |
               |                       6         |                                       |
               |     dataNodeExchanger.request   | +-----------------------------------+ |
               +->------------------------------>+ |    metaClientHandlers             | |
                      NotifyProvideDataChange    | | +-------------------------------+ | |
                                                 | | | notifyPro|ideDataChangeHandler| | |
                                                 | | +-------------------------------+ | |
                                                 | +-----------------------------------+ |
                                                 +---------------------------------------+

Copy the code

Now we know, in Meta Server. DataNodeServiceImpl notifyProvideDataChange function will inform the Data Server, there is now a notifyProvideDataChange news.

0x04 Call Path in Data Server

The execution sequence comes to DataServer. We need to do a few things first.

4.1 the Bean

The Bean metaClientHandlers is the metanodesaner response function. While notifyProvideDataChangeHandler is part of the metaClientHandlers.

@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers(a) {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(serverChangeHandler());
    list.add(statusConfirmHandler());
    list.add(notifyProvideDataChangeHandler());
    return list;
}
Copy the code

4.2 Network Interaction

MetaNodeExchanger in DefaultMetaServiceImpl. Call getMetaServerMap MetaNodeExchanger. Connect, will set the metaClientHandlers. Thus notifyProvideDataChangeHandler with MetaServer by Bolt.

public class DefaultMetaServiceImpl implements IMetaServerService {
  @Override
  public Map<String, Set<String>> getMetaServerMap() {
      Connection connection = null;
              connection = ((BoltChannel) metaNodeExchanger.connect(newURL(list.iterator() .next(), dataServerConfig.getMetaServerPort()))).getConnection(); }}Copy the code

The METANodesaner is defined as follows. It processes MetaServer interactions within the DataServer in a unified manner.

public class MetaNodeExchanger implements NodeExchanger {
    @Autowired
    private Exchange                          boltExchange;

    @Autowired
    private IMetaServerService                metaServerService;

    @Resource(name = "metaClientHandlers")
    private Collection<AbstractClientHandler> metaClientHandlers;

    public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
        if (client == null) {
            synchronized (this) {
                client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
                if (client == null) {
                    client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
                        metaClientHandlers.toArray(newChannelHandler[metaClientHandlers.size()])); }}}//try to connect data
        Channel channel = client.getChannel(url);
        if (channel == null) {
            synchronized (this) {
                channel = client.getChannel(url);
                if (channel == null) { channel = client.connect(url); }}}returnchannel; }}Copy the code

4.3 Handler definition

NotifyProvideDataChangeHandler in interest function, can set his process NotifyProvideDataChange type messages. So when the MetaServer notice have NotifyProvideDataChange is called metaServerService. FetchData (dataInfoId); Obtain ProvideData for subsequent processing.

public class NotifyProvideDataChangeHandler extends AbstractClientHandler {

    @Autowired
    private IMetaServerService   metaServerService;

    @Autowired
    private ProvideDataProcessor provideDataProcessorManager;


    @Override
    public Object doHandle(Channel channel, Object request) {
        NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request;
        String dataInfoId = notifyProvideDataChange.getDataInfoId();
        if(notifyProvideDataChange.getDataOperator() ! = DataOperator.REMOVE) { ProvideData provideData = metaServerService.fetchData(dataInfoId); provideDataProcessorManager.changeDataProcess(provideData); }return null;
    }
  
    @Override
    public Class interest(a) {
        returnNotifyProvideDataChange.class; }}Copy the code

4.4 call Handler

In the Meta Server, DataNodeServiceImpl notifyProvideDataChange function will inform the Data Server, there is now a notifyProvideDataChange news.

So NotifyProvideDataChangeHandler will respond.

4.5 get ProvideData

Among the NotifyProvideDataChangeHandler, there are as follows

ProvideData provideData = metaServerService.fetchData(dataInfoId);
Copy the code

Then call fetchData in DefaultMetaServiceImpl to Meta Server to get ProvideData.

@Override
public ProvideData fetchData(String dataInfoId) {

    Map<String, Connection> connectionMap = metaServerConnectionFactory
        .getConnections(dataServerConfig.getLocalDataCenter());
    String leader = getLeader().getIp();
    if (connectionMap.containsKey(leader)) {
        Connection connection = connectionMap.get(leader);
        if (connection.isFine()) {
            try {
                Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() {

                    @Override
                    public FetchProvideDataRequest getRequestBody(a) {
                        return new FetchProvideDataRequest(dataInfoId);
                    }

                    @Override
                    public URL getRequestUrl(a) {
                        return newURL(connection.getRemoteIP(), connection.getRemotePort()); }}; Response response = metaNodeExchanger.request(request); Object result = response.getResult();if (result instanceof ProvideData) {
                    return (ProvideData) result;
                } 
            } 
        }
    }
    String newip = refreshLeader().getIp();
    return null;
}
Copy the code

Now the picture is as follows:

+---------------------------+ +--------------------------------------------------+ +---------------------------------+ |  DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | |1| +-----------------------------+ | | getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | ^  | | serverChangeHandler | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | +--------------------------------+ | |2                              |     3| | | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | | | | | | | | | | | | | | | | | | | | | | | | | ProvideData provideData = +--------------------------------------------------------------------------------------+ | | | | | | get ProvideData from Meta Server FetchProvideDataRequest | | | metaServerService.fetchData | | | | | | | | | | | | | | | | | | | | | | |  | changeDataProcess(provideData) | | | | | | | | | | | +--------------------------------+ | | | +------------------------------------+ | +------------------------------------------+Copy the code

Here’s what’s on the phone:

0x05 goes back to MetaServer

The execution sequence goes back to the MetaServer, which receives a FetchProvideDataRequest.

5.1 Data Server Request Response

FetchProvideDataRequestHandler is response function. The logic of the function is relatively simple: it retrieves data from the DBService according to the DataInfoId and returns it to the caller.

public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> {

    @RaftReference
    private DBService           persistenceDataDBService;

    @Override
    public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) {
            DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId());
            if (ret.getOperationStatus() == OperationStatus.SUCCESS) {
                PersistenceData data = (PersistenceData) ret.getEntity();
                ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()),
                    fetchProvideDataRequest.getDataInfoId(), data.getVersion());
                return provideData;
            } else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) {
                ProvideData provideData = new ProvideData(null,
                    fetchProvideDataRequest.getDataInfoId(), null);
                returnprovideData; }}}@Override
    public HandlerType getType(a) {
        return HandlerType.PROCESSER;
    }

    @Override
    public Class interest(a) {
        returnFetchProvideDataRequest.class; }}Copy the code

Thus, the key here is DBService.

From MetaServer’s point of view, the flow is as follows:

+----------------------------------------------+ | Data Server | | | | +---------------------------------------+ | | | NotifyProvideDataChangeHandler | | | | | | | | | | | |metaSer^erSer^ice.fetchData(dataInfoId)| | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | ^ | |1    |   |
 FetchProvideDataRequest |   | ProvideData
                         |   |
                         |   |  4
 +-----------------------------------------+
 |    Meta Server        |   |             |
 |                       |   |             |
 |  +--------------------v---+-------+     |
 |  | FetchProvideDataRequestHandler |     |
 |  +--------------+---+-------------+     |
 |              2  |   ^                   |
 |                 |   | DBResponse        |
 | get(DataInfoId) |   |  3                |
 |                 v   |                   |
 |       +---------+---+------------+      |
 |       | PersistenceDataDBService |      |
 |       +--------------------------+      |
 +-----------------------------------------+

Copy the code

5.2 Handling Session Servers

The Session Server will also initiate a FetchProvideDataRequest. There are the following functions in SessionServerBootstrap that initiate a request to obtain configuration information.

private void fetchStopPushSwitch(URL leaderUrl) {
    FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
        ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID);
    Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
    if (ret instanceofProvideData) { ProvideData provideData = (ProvideData) ret; provideDataProcessorManager.fetchDataProcess(provideData); }}private void fetchEnableDataRenewSnapshot(URL leaderUrl) {
    FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
        ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT);
    Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
    if (data instanceofProvideData) { ProvideData provideData = (ProvideData) data; provideDataProcessorManager.fetchDataProcess(provideData); }}private void fetchBlackList(a) {
    blacklistManager.load();
}
Copy the code

0x06 DataServer

6.1 processing ProvideData

In NotifyProvideDataChangeHandler, the following statement is used to handle ProvideData. That’s in fetchData.

In request response processing

            Response response = metaNodeExchanger.request(request);
            Object result = response.getResult();
            if (result instanceof ProvideData) {
                return (ProvideData) result;
            } 
Copy the code

Here it is:

+---------------------------+ +--------------------------------------------------+ +---------------------------------+ |  DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | |1| +-----------------------------+ | | getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | ^  | | | serverChangeHandler | | | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | | | +--------------------------------+ | |2                              |     3 |    | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |    |
| | |                                | |   |                                                                            |    |
| | |                                | |   |                     get ProvideData from Meta Server                       |    |
| | |                                | |   |                                                                            |    |
| | |  ProvideData provideData = +--------------------------------------------------------------------------------------+    |
| | |                                | |   |                                                                                 |
| | |  metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | |                                | |   |                          ProvideData
| | |                                | |   |
| | |                                | |   |
| | | changeDataProcess(provideData) | |   |
| | |                                | |   |
| | +--------------------------------+ |   |
| +------------------------------------+   |
+------------------------------------------+

Copy the code

The mobile phone is as follows:

The continued processing is as follows:

provideDataProcessorManager.changeDataProcess(provideData);
Copy the code

That’s where the engine comes in.

6.1.1 Bean

Here to generate a processing engine ProvideDataProcessorManager, added a processing handler DatumExpireProvideDataProcessor.

@Configuration
public static class DataProvideDataConfiguration {

    @Bean
    public ProvideDataProcessor provideDataProcessorManager(a) {
        return new ProvideDataProcessorManager();
    }

    @Bean
    public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
        ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor();
        ((ProvideDataProcessorManager) provideDataProcessorManager)
            .addProvideDataProcessor(datumExpireProvideDataProcessor);
        returndatumExpireProvideDataProcessor; }}Copy the code

6.1.2 ProvideDataProcessorManager processing engine

Here still familiar routines, namely ProvideDataProcessor engine, namely ProvideDataProcessorManager also inherited ProvideDataProcessor, but among the support set the return false, This way the engine will not execute itself while iterating through the execution.

public class ProvideDataProcessorManager implements ProvideDataProcessor {

    private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>();

    public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) {
        provideDataProcessors.add(provideDataProcessor);
    }

    @Override
    public void changeDataProcess(ProvideData provideData) {
        for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) {
            if(provideDataProcessor.support(provideData)) { provideDataProcessor.changeDataProcess(provideData); }}}@Override
    public boolean support(ProvideData provideData) {
        return false; }}Copy the code

Also 6.1.3 processing Handler

The DatumLeaseManager here corresponds to the AfterWorkingProcess described earlier.

The Handler calls DatumLeaseManager to deploy the configuration data.

public class DatumExpireProvideDataProcessor implements ProvideDataProcessor {

    @Autowired
    private DatumLeaseManager   datumLeaseManager;

    @Override
    public void changeDataProcess(ProvideData provideData) {
        if (checkInvalid(provideData)) {
            return;
        }
        boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
            .getObject());
        datumLeaseManager.setRenewEnable(enableDataDatumExpire);
    }

    private boolean checkInvalid(ProvideData provideData) {
        boolean invalid = provideData == null || provideData.getProvideData() == null
                          || provideData.getProvideData().getObject() == null;
        return invalid;
    }

    @Override
    public boolean support(ProvideData provideData) {
        returnValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId()); }}Copy the code

Finally, the illustration is as follows:

+---------------------------+ +--------------------------------------------------+ +---------------------------------+ |  DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | |1| +-----------------------------+ | | getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | ^  | | | serverChangeHandler | | | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | | | +--------------------------------+ | |2                              |     3 |    | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |    |
| | |                                | |   |                                                                            |    |
| | |                                | |   |                     get ProvideData from Meta Server                       |    |
| | |                                | |   |                                                                            |    |
| | |  ProvideData provideData = +--------------------------------------------------------------------------------------+    |
| | |                                | |   |                                                                                 |
| | |  metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | |                                | |   |                          ProvideData
| | |                                | |   |
| | |                                | |   |     5           +---------------------------------------------+
| | | changeDataProcess(provideData)+--------------+         |   ProvideDataProcessor                      |
| | |                                | |   |       |         |                                             |
| | +--------------------------------+ |   |       +-------> | changeDataProcess(ProvideData provideData)  |
| +------------------------------------+   |                 |                                             |
+------------------------------------------+                 +---------------------------------------------+

Copy the code

The mobile legend is as follows:

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

How does ant Financial Service registry realize the smooth scaling of DataServer

Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization

The service registry Session storage policy | SOFARegistry parsing

Introduction to the Registry – SOFARegistry architecture for Massive Data

Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing

Ant Financial open source communication framework SOFABolt analysis of connection management analysis

Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework

SOFABolt protocol framework analysis of Ant Financial open source communication framework

Ant gold uniform service registry data consistency analysis | SOFARegistry parsing

Ant communication framework practice

Sofa – Bolt remote call

Sofa – bolt to study

SOFABolt Design Summary – Elegant and simple design approach

SofaBolt source code analysis – Service startup to message processing

SOFABolt source code analysis

SOFABolt source code analysis 9-userProcessor custom processor design

SOFARegistry introduction

SOFABolt source code analysis of the design of the 13-Connection event processing mechanism