0 x00 the
SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.
This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.
This article is the seventeenth article that describes how to handle configuration information for SOFARegistry network operations.
0x01 Business Category
1.1 Configuration Functions
Some system-specific services, for example, need to be set by the console. So the Meta Server provides interfaces to the console. When the Meta Server receives a request from the console, it interacts with the Data Server and Session Server. For example, the Meta Server provides the following interfaces:
@Bean
@ConditionalOnMissingBean
public StopPushDataResource stopPushDataResource(a) {
return new StopPushDataResource();
}
@Bean
public BlacklistDataResource blacklistDataResource(a) {
return new BlacklistDataResource();
}
@Bean
public RenewSwitchResource renewSwitchResource(a) {
return new RenewSwitchResource();
}
Copy the code
The HTTP interface is provided externally because it is normal and basic. However, the Bolt protocol still operates between servers.
1.2 Learning Direction
The derivation is as follows: On the DataServer, how to extract configuration information separately?
0x02 Data structure
2.1 Directory Structure
The DataServer directory contains Handler, service, task, and provideData.
│ ├ ─ ─ metaserver │ │ ├ ─ ─ DefaultMetaServiceImpl. Java │ │ ├ ─ ─ IMetaServerService. Java │ │ ├ ─ ─ MetaServerConnectionFactory. Java │ │ ├ ─ ─ handler │ │ │ ├ ─ ─ NotifyProvideDataChangeHandler. Java │ │ │ ├ ─ ─ ServerChangeHandler. Java │ │ │ └ ─ ─ StatusConfirmHandler. Java │ │ ├ ─ ─ provideData │ │ │ ├ ─ ─ ProvideDataProcessor. Java │ │ │ ├ ─ ─ ProvideDataProcessorManager. Java │ │ │ └ ─ ─ processor │ │ │ └ ─ ─ DatumExpireProvideDataProcessor. Java │ │ └ ─ ─ a taskCopy the code
2.2 Data structure definition
The configuration data structure is as follows:
ProvideData is the external interactive interface, which contains the version number and service identifier dataInfoId.
public class ProvideData implements Serializable {
private ServerDataBox provideData;
private String dataInfoId;
private Long version;
}
Copy the code
ServerDataBox is a concrete service, as defined below
public class ServerDataBox implements Serializable {
/** Null for locally instantiated, otherwise for internalized */
private byte[] bytes;
/** Only available if bytes ! = null */
private int serialization;
/** Actual object, lazy deserialized */
private Object object;
}
Copy the code
Regarding ServerDataBox, there is currently only one use of Data Server. Boolean type is used, that is, control switch configuration.
public void changeDataProcess(ProvideData provideData) {
boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
.getObject());
datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
Copy the code
0x03 Meta Server Internal Flow
Here in order to get through the process, the need to mention meta server inside and metaServerService. FetchData (dataInfoId) related to the process.
For decoupling purposes, Meta Server splits certain business functions into four layers, with the basic logic:
Http Resource ———> TaskListener ———> Task ————> Service
Copy the code
Firstly, the flow chart is given as follows, and the process will be introduced step by step below:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+ 7
+-------+ 1 | | +---------------------------------------+
| Admin | +---> | +--------------------- | | Data Server |
+-------+ | |fireDataChangeNotify| | | |
| +--------------------+ | 6 | +-----------------------------------+ |
+------------------------+ | | metaClientHandlers | |
| +---------------------+ dataNodeExchanger.request | | +-------------------------------+ | |
| 3 | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | |
| +----------+----------+ NotifyProvideDataChange | | +-------------------------------+ | |
| NotifyProvideDataChange ^ | | | |
| | | +-----------------------------------+ |
| 5| notifyProvideDataChange +---------------------------------------+ v | +---------+-----------------------------------+ | | DefaultTaskListenerManager | | | | +----+----------------------------+ | +-----------------------------------------+ |4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | | | | |
| | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
Copy the code
The mobile phone icon is as follows:
3.1 Admin Request Response
As mentioned above, Meta Server provides some control interfaces to Admin over Http. Let’s use BlacklistDataResource as an example.
As you can see, the blacklistPush function is stored in the persistenceDataDBService, and fireDataChangeNotify sends NotifyProvideDataChange indirectly.
@Path("blacklist")
public class BlacklistDataResource {
@RaftReference
private DBService persistenceDataDBService;
@Autowired
private TaskListenerManager taskListenerManager;
/** * update blacklist * e.g. curl -d '{" FORBIDDEN_PUB ": {" IP_FULL" : [" 1.1.1.1 ", "10.15.233.150"]}, "FORBIDDEN_SUB_BY_PREFIX" : {" IP_FULL: "[]" 1.1.1.1 "}}' - H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update */
@POST
@Path("update")
@Produces(MediaType.APPLICATION_JSON)
public Result blacklistPush(String config) {
PersistenceData persistenceData = createDataInfo();
persistenceData.setData(config);
boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID,
persistenceData);
fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID,
DataOperator.UPDATE);
Result result = new Result();
result.setSuccess(true);
return result;
}
private PersistenceData createDataInfo(a) {
DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID);
PersistenceData persistenceData = new PersistenceData();
persistenceData.setDataId(dataInfo.getDataId());
persistenceData.setGroup(dataInfo.getDataType());
persistenceData.setInstanceId(dataInfo.getInstanceId());
persistenceData.setVersion(System.currentTimeMillis());
return persistenceData;
}
private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) {
NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId,
version, dataOperator);
TaskEvent taskEvent = newTaskEvent(notifyProvideDataChange, TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK); taskListenerManager.sendTaskEvent(taskEvent); }}Copy the code
This corresponds to the figure above:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
Copy the code
3.2 DBService
As you can see, DBService is also Raft based, indicating that consistency is maintained within the MetaServer cluster itself.
@RaftReference
private DBService persistenceDataDBService;
Copy the code
The PersistenceDataDBService class is defined as follows:
@RaftService
public class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService {
private ConcurrentHashMap<String, Object> serviceMap = new ConcurrentHashMap<>();
@Override
public boolean put(String key, Object value) {
Object ret = serviceMap.put(key, value);
return true;
}
@Override
public DBResponse get(String key) {
Object ret = serviceMap.get(key);
returnret ! =null ? DBResponse.ok(ret).build() : DBResponse.notfound().build();
}
@Override
public boolean update(String key, Object value) {
Object ret = serviceMap.put(key, value);
return true;
}
@Override
public Set<String> getSnapshotFileNames(a) {
if(! snapShotFileNames.isEmpty()) {return snapShotFileNames;
}
snapShotFileNames.add(this.getClass().getSimpleName());
returnsnapShotFileNames; }}Copy the code
As you can see, ConcurrentHashMap is used for storage and Raft uses a file system for snapshot backup.
3.3 the Bean
As mentioned earlier, for decoupling, Meta Server encapsulates functions such as message processing and forwarding as TaskListeners, which are logically executed by the TaskListenerManager. Here, take the related functions of ProvideData as an example, and the corresponding Bean is.
@Configuration
public static class MetaServerTaskConfiguration {...@Bean
public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) {
TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener(
sessionNodeSingleTaskProcessor());
taskListenerManager.addTaskListener(taskListener);
return taskListener;
}
@Bean
public TaskListenerManager taskListenerManager(a) {
return newDefaultTaskListenerManager(); }}Copy the code
3.4 the Listener
The Listener execution engine iterates through the Listener list to process the Listener. If a Listener can process the Listener, the Listener is executed.
public class DefaultTaskListenerManager implements TaskListenerManager {
private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();
@Override
public Multimap<TaskType, TaskListener> getTaskListeners(a) {
return taskListeners;
}
@Override
public void addTaskListener(TaskListener taskListener) {
taskListeners.put(taskListener.support(), taskListener);
}
@Override
public void sendTaskEvent(TaskEvent taskEvent) {
Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
for(TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); }}}Copy the code
The corresponding service Listener is as follows:
public class PersistenceDataChangeNotifyTaskListener implements TaskListener {
@Autowired
private MetaServerConfig metaServerConfig;
private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher;
public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) {
singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher(
TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor);
}
@Override
public TaskType support(a) {
return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK;
}
@Override
public void handleEvent(TaskEvent event) {
MetaServerTask persistenceDataChangeNotifyTask = newPersistenceDataChangeNotifyTask( metaServerConfig); persistenceDataChangeNotifyTask.setTaskEvent(event); singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(), persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime()); }}Copy the code
This corresponds to the following:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
|
| 3
|
| NotifyProvideDataChange
|
|
v
+---------+-----------------------------------+
| DefaultTaskListenerManager |
| |
| +-----------------------------------------+ |
| | persistenceDataChangeNotifyTaskListener | |
| | | |
| | receiveStatusConfirmNotifyTaskListener | |
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
Copy the code
3.5 the Task
The Listener calls the Task.
Different services are called according to different notetypes:
public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask {
private final SessionNodeService sessionNodeService;
private final DataNodeService dataNodeService;
final private MetaServerConfig metaServerConfig;
private NotifyProvideDataChange notifyProvideDataChange;
@Override
public void execute(a) {
Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes();
if (nodeTypes.contains(NodeType.DATA)) {
dataNodeService.notifyProvideDataChange(notifyProvideDataChange);
}
if(nodeTypes.contains(NodeType.SESSION)) { sessionNodeService.notifyProvideDataChange(notifyProvideDataChange); }}@Override
public void setTaskEvent(TaskEvent taskEvent) {
Object obj = taskEvent.getEventObj();
if (obj instanceof NotifyProvideDataChange) {
this.notifyProvideDataChange = (NotifyProvideDataChange) obj; }}}Copy the code
Here is the corresponding
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
|
| 3
|
| NotifyProvideDataChange
|
|
v
+-------------------+-------------------------+
| DefaultTaskListenerManager |
| | +---------------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | | | | |
| | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
Copy the code
3.6 services
A task invokes a service to perform a specific service, such as the following, which sends a push to the DataServer or SessionServer.
public class DataNodeServiceImpl implements DataNodeService {
@Autowired
private NodeExchanger dataNodeExchanger;
@Autowired
private StoreService dataStoreService;
@Autowired
private AbstractServerHandler dataConnectionHandler;
@Override
public NodeType getNodeType(a) {
return NodeType.DATA;
}
@Override
public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) {
NodeConnectManager nodeConnectManager = getNodeConnectManager();
Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);
// add register confirm
StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
Map<String, DataNode> dataNodes = storeService.getNodes();
for (InetSocketAddress connection : connections) {
if(! dataNodes.keySet().contains(connection.getAddress().getHostAddress())) {continue;
}
try {
Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() {
@Override
public NotifyProvideDataChange getRequestBody(a) {
return notifyProvideDataChange;
}
@Override
public URL getRequestUrl(a) {
return newURL(connection); }}; dataNodeExchanger.request(request); }}}}Copy the code
Here is the corresponding
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
| +---------------------+
| 3 | DataNodeServiceImpl |
| +----------+----------+
| NotifyProvideDataChange ^
| |
| 5 | notifyProvideDataChange
v |
+---------+-----------------------------------+ |
| DefaultTaskListenerManager | |
| | +----+----------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | | | | |
| | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
Copy the code
After it’s sent, it’s
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
| 3
| NotifyProvideDataChange
v
+-------------------+-------------------------+
| DefaultTaskListenerManager |
| | +---------------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | receiveStatusConfirmNotifyTaskListener | | | |
| | dataNodeChangePushTaskListener | | +----+----------------------------+
| | sessionNodeChangePushTaskListener | | |
| +-----------------------------------------+ | |
+---------------------------------------------+ 5 | notifyProvideDataChange
|
+-------------------------------------------+
|
v
+---v---------+-------+
| DataNodeServiceImpl | +---------------------------------------+
+-------------+-------+ | Data Server 7 |
| 6 | |
| dataNodeExchanger.request | +-----------------------------------+ |
+->------------------------------>+ | metaClientHandlers | |
NotifyProvideDataChange | | +-------------------------------+ | |
| | | notifyPro|ideDataChangeHandler| | |
| | +-------------------------------+ | |
| +-----------------------------------+ |
+---------------------------------------+
Copy the code
Now we know, in Meta Server. DataNodeServiceImpl notifyProvideDataChange function will inform the Data Server, there is now a notifyProvideDataChange news.
0x04 Call Path in Data Server
The execution sequence comes to DataServer. We need to do a few things first.
4.1 the Bean
The Bean metaClientHandlers is the metanodesaner response function. While notifyProvideDataChangeHandler is part of the metaClientHandlers.
@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers(a) {
Collection<AbstractClientHandler> list = new ArrayList<>();
list.add(serverChangeHandler());
list.add(statusConfirmHandler());
list.add(notifyProvideDataChangeHandler());
return list;
}
Copy the code
4.2 Network Interaction
MetaNodeExchanger in DefaultMetaServiceImpl. Call getMetaServerMap MetaNodeExchanger. Connect, will set the metaClientHandlers. Thus notifyProvideDataChangeHandler with MetaServer by Bolt.
public class DefaultMetaServiceImpl implements IMetaServerService {
@Override
public Map<String, Set<String>> getMetaServerMap() {
Connection connection = null;
connection = ((BoltChannel) metaNodeExchanger.connect(newURL(list.iterator() .next(), dataServerConfig.getMetaServerPort()))).getConnection(); }}Copy the code
The METANodesaner is defined as follows. It processes MetaServer interactions within the DataServer in a unified manner.
public class MetaNodeExchanger implements NodeExchanger {
@Autowired
private Exchange boltExchange;
@Autowired
private IMetaServerService metaServerService;
@Resource(name = "metaClientHandlers")
private Collection<AbstractClientHandler> metaClientHandlers;
public Channel connect(URL url) {
Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
if (client == null) {
synchronized (this) {
client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
if (client == null) {
client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
metaClientHandlers.toArray(newChannelHandler[metaClientHandlers.size()])); }}}//try to connect data
Channel channel = client.getChannel(url);
if (channel == null) {
synchronized (this) {
channel = client.getChannel(url);
if (channel == null) { channel = client.connect(url); }}}returnchannel; }}Copy the code
4.3 Handler definition
NotifyProvideDataChangeHandler in interest function, can set his process NotifyProvideDataChange type messages. So when the MetaServer notice have NotifyProvideDataChange is called metaServerService. FetchData (dataInfoId); Obtain ProvideData for subsequent processing.
public class NotifyProvideDataChangeHandler extends AbstractClientHandler {
@Autowired
private IMetaServerService metaServerService;
@Autowired
private ProvideDataProcessor provideDataProcessorManager;
@Override
public Object doHandle(Channel channel, Object request) {
NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request;
String dataInfoId = notifyProvideDataChange.getDataInfoId();
if(notifyProvideDataChange.getDataOperator() ! = DataOperator.REMOVE) { ProvideData provideData = metaServerService.fetchData(dataInfoId); provideDataProcessorManager.changeDataProcess(provideData); }return null;
}
@Override
public Class interest(a) {
returnNotifyProvideDataChange.class; }}Copy the code
4.4 call Handler
In the Meta Server, DataNodeServiceImpl notifyProvideDataChange function will inform the Data Server, there is now a notifyProvideDataChange news.
So NotifyProvideDataChangeHandler will respond.
4.5 get ProvideData
Among the NotifyProvideDataChangeHandler, there are as follows
ProvideData provideData = metaServerService.fetchData(dataInfoId);
Copy the code
Then call fetchData in DefaultMetaServiceImpl to Meta Server to get ProvideData.
@Override
public ProvideData fetchData(String dataInfoId) {
Map<String, Connection> connectionMap = metaServerConnectionFactory
.getConnections(dataServerConfig.getLocalDataCenter());
String leader = getLeader().getIp();
if (connectionMap.containsKey(leader)) {
Connection connection = connectionMap.get(leader);
if (connection.isFine()) {
try {
Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() {
@Override
public FetchProvideDataRequest getRequestBody(a) {
return new FetchProvideDataRequest(dataInfoId);
}
@Override
public URL getRequestUrl(a) {
return newURL(connection.getRemoteIP(), connection.getRemotePort()); }}; Response response = metaNodeExchanger.request(request); Object result = response.getResult();if (result instanceof ProvideData) {
return (ProvideData) result;
}
}
}
}
String newip = refreshLeader().getIp();
return null;
}
Copy the code
Now the picture is as follows:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+ | DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | |1| +-----------------------------+ | | getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | ^ | | serverChangeHandler | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | +--------------------------------+ | |2 | 3| | | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | | | | | | | | | | | | | | | | | | | | | | | | | ProvideData provideData = +--------------------------------------------------------------------------------------+ | | | | | | get ProvideData from Meta Server FetchProvideDataRequest | | | metaServerService.fetchData | | | | | | | | | | | | | | | | | | | | | | | | changeDataProcess(provideData) | | | | | | | | | | | +--------------------------------+ | | | +------------------------------------+ | +------------------------------------------+Copy the code
Here’s what’s on the phone:
0x05 goes back to MetaServer
The execution sequence goes back to the MetaServer, which receives a FetchProvideDataRequest.
5.1 Data Server Request Response
FetchProvideDataRequestHandler is response function. The logic of the function is relatively simple: it retrieves data from the DBService according to the DataInfoId and returns it to the caller.
public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> {
@RaftReference
private DBService persistenceDataDBService;
@Override
public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) {
DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId());
if (ret.getOperationStatus() == OperationStatus.SUCCESS) {
PersistenceData data = (PersistenceData) ret.getEntity();
ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()),
fetchProvideDataRequest.getDataInfoId(), data.getVersion());
return provideData;
} else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) {
ProvideData provideData = new ProvideData(null,
fetchProvideDataRequest.getDataInfoId(), null);
returnprovideData; }}}@Override
public HandlerType getType(a) {
return HandlerType.PROCESSER;
}
@Override
public Class interest(a) {
returnFetchProvideDataRequest.class; }}Copy the code
Thus, the key here is DBService.
From MetaServer’s point of view, the flow is as follows:
+----------------------------------------------+ | Data Server | | | | +---------------------------------------+ | | | NotifyProvideDataChangeHandler | | | | | | | | | | | |metaSer^erSer^ice.fetchData(dataInfoId)| | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | ^ | |1 | |
FetchProvideDataRequest | | ProvideData
| |
| | 4
+-----------------------------------------+
| Meta Server | | |
| | | |
| +--------------------v---+-------+ |
| | FetchProvideDataRequestHandler | |
| +--------------+---+-------------+ |
| 2 | ^ |
| | | DBResponse |
| get(DataInfoId) | | 3 |
| v | |
| +---------+---+------------+ |
| | PersistenceDataDBService | |
| +--------------------------+ |
+-----------------------------------------+
Copy the code
5.2 Handling Session Servers
The Session Server will also initiate a FetchProvideDataRequest. There are the following functions in SessionServerBootstrap that initiate a request to obtain configuration information.
private void fetchStopPushSwitch(URL leaderUrl) {
FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID);
Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
if (ret instanceofProvideData) { ProvideData provideData = (ProvideData) ret; provideDataProcessorManager.fetchDataProcess(provideData); }}private void fetchEnableDataRenewSnapshot(URL leaderUrl) {
FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT);
Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
if (data instanceofProvideData) { ProvideData provideData = (ProvideData) data; provideDataProcessorManager.fetchDataProcess(provideData); }}private void fetchBlackList(a) {
blacklistManager.load();
}
Copy the code
0x06 DataServer
6.1 processing ProvideData
In NotifyProvideDataChangeHandler, the following statement is used to handle ProvideData. That’s in fetchData.
In request response processing
Response response = metaNodeExchanger.request(request);
Object result = response.getResult();
if (result instanceof ProvideData) {
return (ProvideData) result;
}
Copy the code
Here it is:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+ | DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | |1| +-----------------------------+ | | getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | ^ | | | serverChangeHandler | | | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | | | +--------------------------------+ | |2 | 3 | | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | |
| | | | | | | |
| | | | | | get ProvideData from Meta Server | |
| | | | | | | |
| | | ProvideData provideData = +--------------------------------------------------------------------------------------+ |
| | | | | | |
| | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | | | | | ProvideData
| | | | | |
| | | | | |
| | | changeDataProcess(provideData) | | |
| | | | | |
| | +--------------------------------+ | |
| +------------------------------------+ |
+------------------------------------------+
Copy the code
The mobile phone is as follows:
The continued processing is as follows:
provideDataProcessorManager.changeDataProcess(provideData);
Copy the code
That’s where the engine comes in.
6.1.1 Bean
Here to generate a processing engine ProvideDataProcessorManager, added a processing handler DatumExpireProvideDataProcessor.
@Configuration
public static class DataProvideDataConfiguration {
@Bean
public ProvideDataProcessor provideDataProcessorManager(a) {
return new ProvideDataProcessorManager();
}
@Bean
public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor();
((ProvideDataProcessorManager) provideDataProcessorManager)
.addProvideDataProcessor(datumExpireProvideDataProcessor);
returndatumExpireProvideDataProcessor; }}Copy the code
6.1.2 ProvideDataProcessorManager processing engine
Here still familiar routines, namely ProvideDataProcessor engine, namely ProvideDataProcessorManager also inherited ProvideDataProcessor, but among the support set the return false, This way the engine will not execute itself while iterating through the execution.
public class ProvideDataProcessorManager implements ProvideDataProcessor {
private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>();
public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) {
provideDataProcessors.add(provideDataProcessor);
}
@Override
public void changeDataProcess(ProvideData provideData) {
for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) {
if(provideDataProcessor.support(provideData)) { provideDataProcessor.changeDataProcess(provideData); }}}@Override
public boolean support(ProvideData provideData) {
return false; }}Copy the code
Also 6.1.3 processing Handler
The DatumLeaseManager here corresponds to the AfterWorkingProcess described earlier.
The Handler calls DatumLeaseManager to deploy the configuration data.
public class DatumExpireProvideDataProcessor implements ProvideDataProcessor {
@Autowired
private DatumLeaseManager datumLeaseManager;
@Override
public void changeDataProcess(ProvideData provideData) {
if (checkInvalid(provideData)) {
return;
}
boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
.getObject());
datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
private boolean checkInvalid(ProvideData provideData) {
boolean invalid = provideData == null || provideData.getProvideData() == null
|| provideData.getProvideData().getObject() == null;
return invalid;
}
@Override
public boolean support(ProvideData provideData) {
returnValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId()); }}Copy the code
Finally, the illustration is as follows:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+ | DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server | | | | |1| +-----------------------------+ | | getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | | | | +-----------------------------------+--------------+ | | | | +---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | | +------------------------------------------+ | | | | | | | metaClientHandlers +--------------------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | ^ | | | serverChangeHandler | | | | | | | | | | | | | | statusConfirmHandler | | NotifyProvideDataChange | | | | | | | | | | | | +--------------------------------+ | |2 | 3 | | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | |
| | | | | | | |
| | | | | | get ProvideData from Meta Server | |
| | | | | | | |
| | | ProvideData provideData = +--------------------------------------------------------------------------------------+ |
| | | | | | |
| | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | | | | | ProvideData
| | | | | |
| | | | | | 5 +---------------------------------------------+
| | | changeDataProcess(provideData)+--------------+ | ProvideDataProcessor |
| | | | | | | | |
| | +--------------------------------+ | | +-------> | changeDataProcess(ProvideData provideData) |
| +------------------------------------+ | | |
+------------------------------------------+ +---------------------------------------------+
Copy the code
The mobile legend is as follows:
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
How does ant Financial Service registry realize the smooth scaling of DataServer
Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization
The service registry Session storage policy | SOFARegistry parsing
Introduction to the Registry – SOFARegistry architecture for Massive Data
Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing
Ant Financial open source communication framework SOFABolt analysis of connection management analysis
Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework
SOFABolt protocol framework analysis of Ant Financial open source communication framework
Ant gold uniform service registry data consistency analysis | SOFARegistry parsing
Ant communication framework practice
Sofa – Bolt remote call
Sofa – bolt to study
SOFABolt Design Summary – Elegant and simple design approach
SofaBolt source code analysis – Service startup to message processing
SOFABolt source code analysis
SOFABolt source code analysis 9-userProcessor custom processor design
SOFARegistry introduction
SOFABolt source code analysis of the design of the 13-Connection event processing mechanism