0 x00 the
SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.
This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.
In this article, we describe how SOFARegistry handles changes to a Data node in your computer room. In this article, we describe how SOFARegistry handles changes to a Data node.
0x02 Business category
2.1 DataServer Data Consistency
DataServer performs core data storage functions in SOFARegistry. Data is stored in consistent Hash fragments based on dataInfoId and supports multi-copy backup to ensure high data availability. This layer can be expanded as the size of the service data volume grows.
If DataServer fails, MetaServer notifies all DataServer and sessionServers that data fragments can be failover to other replicas, and that the DataServer migrates the fragmented data within the cluster.
2.2 Local Equipment Room Policy
Data Center stands for the local computer room.
- Data backup is only done in the local machine room;
- Each data center has its own hash;
Ali has different machine room backup, should be the Global part, but no open source.
So we have to focus on how the local node continues to process.
0x03 General Logic
DataServer informs each other about the version number of the data.
- A NotifyOnlineRequest tells other online nodes that I’m new and you can configure it, and tells me what version number data you have.
- The online service node uses NotifyFetchDatumRequest to tell the new node, I have the version number data you need, come and get it.
Therefore, we conclude as follows: After receiving the data Server change message from Meta Server, all data servers in the same data Center will inform each other of the upgrade version number.
- NotifyOnline sends a NotifyOnlineRequest, and the NotifyOnlineHandler of other Data servers does the corresponding processing.
- NotifyToFetch sends NotifyFetchDatumRequest, and notifyFetchDatumHandler of other Data servers does the corresponding processing.
0 x04 message
4.1 LocalDataServerChangeEvent
Mentioned above, in DataServerChangeEventHandler, when handling DataServerChangeEvent, if the current node is the DataCenter, trigger the LocalDataServerChangeEvent events.
public class LocalDataServerChangeEvent implements Event {
private Map<String, DataNode> localDataServerMap;
private long localDataCenterversion;
private Set<String> newJoined;
private long version;
}
Copy the code
4.2 source
Is DataServerChangeEventHandler LocalDataServerChangeEvent sources.
MetaServer senses when a new node comes online or goes offline through the network connection. All dataservers run a ConnectionRefreshTask that periodically polls MetaServer to obtain information about data nodes. It should be noted that in addition to DataServer actively fetching node information from MetaServer, MetaServer also actively sends NodeChangeResult requests to each node to inform the node of information changes. The final effect of pushing and pulling to obtain information is consistent.
When the data node returned by polling information changes, a DataServerChangeEvent will be sent to EventCenter. If the processor of this event determines that the node information in the equipment room has changed, LocalDataServerChangeEvent will deliver a new event, the event processor LocalDataServerChangeEventHandler will decide whether the current node is a new node, If it is a new node, a NotifyOnlineRequest request is sent to other nodes, as shown:
In DataServerChangeEventHandler doHandle function, can produce LocalDataServerChangeEvent.
0x05 Message processing
LocalDataServerChangeEventHandler is data node with the room change event handlers, or the same cluster data synchronizer.
LocalDataServerChangeEvent event processor LocalDataServerChangeEventHandler will decide whether the current node is a new node, If it is a new node, a NotifyOnlineRequest request is sent to other nodes. Therefore, the Data Server newly added to the Data Center is processed.
5.1 LocalDataServerChangeEventHandler
The key is in the midst of LocalDataServerChangeEventHandler:
private BlockingQueue<LocalDataServerChangeEvent> events = new LinkedBlockingDeque<>();
private class LocalClusterDataSyncer implements Runnable
Copy the code
The explanation is as follows:
- AfterPropertiesSet starts a thread called LocalClusterDataSyncer for asynchronous processing.
- During doHandle, LocalClusterDataSyncer is called through Events for asynchronous processing.
Made a unified within LocalDataServerChangeEventHandler delaying asynchronous processing. After get LocalDataServerChangeEvent from EventCenter, will turn out for the events in this event, then the internal LocalClusterDataSyncer asynchronous execution will be in the following.
Inside LocalClusterDataSyncer is:
- If it is in a working state, start comparing Data and notify the relevant Data Servers. If local server is working, compare sync data;
- If the server is not working, it is a new server and notifies other servers. Newer if local server is not working, notify others that I am newer;
LocalDataServerChangeEventHandler are defined as follows:
public class LocalDataServerChangeEventHandler extends
AbstractEventHandler<LocalDataServerChangeEvent> {
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private LocalDataServerCleanHandler localDataServerCleanHandler;
@Autowired
private DataServerCache dataServerCache;
@Autowired
private DataNodeExchanger dataNodeExchanger;
@Autowired
private DataNodeStatus dataNodeStatus;
@Autowired
private DatumCache datumCache;
@Autowired
private DatumLeaseManager datumLeaseManager;
private BlockingQueue<LocalDataServerChangeEvent> events = new LinkedBlockingDeque<>();
}
Copy the code
5.1.1 Sending messages
In the doHandle function, the latest messages are placed in BlockingQueue.
public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
isChanged.set(true);
// Better change to Listener pattern
localDataServerCleanHandler.reset();
datumLeaseManager.reset();
events.offer(localDataServerChangeEvent);
}
Copy the code
5.1.2 Starting the Engine
After the Bean is started, the consumption engine is started via afterPropertiesSet.
@Override
public void afterPropertiesSet(a) throws Exception {
super.afterPropertiesSet();
start();
}
public void start(a) { Executor executor = ExecutorFactory .newSingleThreadExecutor(LocalDataServerChangeEventHandler.class.getSimpleName()); executor.execute(new LocalClusterDataSyncer());
}
Copy the code
LocalClusterDataSyncer performs business-specific consumption messages.
0x06 Consumption notification message
In the engine, LocalClusterDataSyncer continues to consume.
private class LocalClusterDataSyncer implements Runnable {
@Override
public void run(a) {
while (true) {
try {
LocalDataServerChangeEvent event = events.take();
//if size of events is greater than 0, not handle and continue, only handle the last one in the queue
if (events.size() > 0) {
continue;
}
long changeVersion = event.getVersion();
isChanged.set(false);
if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) {
//if local server is working, compare sync data
notifyToFetch(event, changeVersion);
} else {
dataServerCache.checkAndUpdateStatus(changeVersion);
//if local server is not working, notify others that i am newernotifyOnline(changeVersion); dataServerCache.updateItem(event.getLocalDataServerMap(), event.getLocalDataCenterversion(), dataServerConfig.getLocalDataCenter()); }}}}Copy the code
【 Key notes 】
Each data server from meta server receives the DataServerChangeEvent, because is the local data server, so will be converted to LocalDataServerChangeEvent.
Since every Data Server will receive it, the newly online server will receive it, and the already online server will also receive it. This is the main point of this presentation.
6.1 the new node
In the new node, LocalDataServerChangeEvent event processor LocalDataServerChangeEventHandler will decide whether the current node is a new node, If it is a new node, a NotifyOnlineRequest request is sent to other nodes, as shown:
The logic of a new node when a graph DataServer node goes online
The figure above shows the processing logic of newly added nodes receiving the message of node change. If the nodes already running online receive the message of node change, the previous processing process is the same. Difference in LocalDataServerChangeEventHandler the change node is calculated according to the Hash ring (expansion scenarios, change the node is a new node, should the scenarios, The change node is the data fragment scope and backup node of the offline node’s successor node in the Hash ring.
A newly added node uses a NotifyOnlineRequest to tell the other nodes that are already online that I am new and you can configure it accordingly.
6.1.1 notifyOnline
NotifyOnline gets all the dataserVerNodes in the current Local Data Center from the DataServerNodeFactory and sends NotifyOnlineRequest one by one: I am online.
Other online Data servers then start interacting with the new node when notified.
/**
* notify other dataservers that this server is online newly
*
* @param changeVersion
*/
private void notifyOnline(long changeVersion) {
Map<String, DataServerNode> dataServerNodeMap = DataServerNodeFactory
.getDataServerNodes(dataServerConfig.getLocalDataCenter());
for (Entry<String, DataServerNode> serverEntry : dataServerNodeMap.entrySet()) {
while (true) {
String ip = serverEntry.getKey();
DataServerNode dataServerNode = DataServerNodeFactory.getDataServerNode(
dataServerConfig.getLocalDataCenter(), ip);
try {
final Connection connection = dataServerNode.getConnection();
CommonResponse response = (CommonResponse) dataNodeExchanger.request(
new Request() {
@Override
public Object getRequestBody(a) {
return new NotifyOnlineRequest(DataServerConfig.IP,
changeVersion);
}
@Override
public URL getRequestUrl(a) {
return newURL(connection.getRemoteIP(), connection .getRemotePort()); } }).getResult(); }}}}Copy the code
6.2 Online Service Nodes
The current online service node traverses the data items in its own memory, filters out the data items that fall within the fragment range of the change node, and then sends NotifyFetchDatumRequest to the change node and its backup node. After receiving the request, the change node and its backup node The processor will synchronize data to the sender (NotifyFetchDatumHandler. FetchDatum), as shown.
Note that this figure is the opposite of the placement of the JVM to the left and right of the figure above.
The logic of the existing node when the graph DataServer node changes
That is, the online service node tells the new node through NotifyFetchDatumRequest that I have the data you need and you should come and get it.
Here are some important functions:
6.2.1 notifyToFetch
Notify onlined newly dataservers to fetch datum, so you can update yourself according to the request message.
The specific functions of notifyToFetch are:
- First of all, a new Server is obtained from event. This data is set in three formats, namely Map format dataServerMapIn and List format dataServerNodeList. The dataServerMap format is ConcurrentHashMap.
- Generate a consistentHash with the new Server;
- use
toBeSyncMap = getToBeSyncMap(consistentHash);
Get the map to be synchronized. GetToBeSyncMap does thisWhich IP needs to synchronize which things
; Get a map of datum to be synced.- Traverse toBeSyncMap, and for each toBeSyncEntry that needs to be synchronized, get its IP and dataInfoMap, which is
Map<String, Map<String, BackupTriad>>
Type;- Iterate over all of the values in dataInfoMap
Entry<String, Map<String, BackupTriad>> dataCenterEntry
, the key of the entry is a dataCenter;- Iterate over all of dataTriadEntry
Entry<String, BackupTriad> dataTriadEntry
, whose key is dataInfoId;- Get Datum from datumCache with dataInfoId;
- Get the Datum version number
versionMap.put(dataInfoId, datum.getVersion());
- Iterate over all of dataTriadEntry
- Build a unified big-version map for this dataCenter: allVersionMap,
allVersionMap.put(dataCenter, versionMap);
- If allVersionMap is empty, do the following:
- Remove the CORRESPONDING IP address from dataServerCache.
- Notify the corresponding data server of this IP that you need to synchronize these: doNotify(IP, allVersionMap, changeVersion); Tell the IP that you need to synchronize the datainfoids in the dataCenter with their version numbers;
- Remove IP from dataServerCache
- Iterate over all of the values in dataInfoMap
- Traverse toBeSyncMap, and for each toBeSyncEntry that needs to be synchronized, get its IP and dataInfoMap, which is
- If the dataServerMap of the ConcurrentHashMap format is non-empty, iterate over its key, which is a targetIp, and remove the targetIp from the dataServerCache;
- DataServerCache updates server list based on dataServerMapIn.
6.2.2 getToBeSyncMap
The logic of getToBeSyncMap is to find out the list of IP addresses that need to be notified and which datainfoids need to be synchronized for each IP as follows:
- The function argument is consistentHashs newly calculated from the new Servers
- ConsistentHashOld computs an old hash based on the old configuration of dataServerConfig.
- For each Datum in the datumCache, a new triAD is computed; Details are as follows:
- Get all datumCache data, build an allMap, traversal all dataCenterEntry in allMap:
- For the data center, traverse all datummaps for the data Center:
- Traverse the datumMap with dataInfoId:
- Calculate the new backupNodes with the new consistentHash;
- Old consistentHashOld to get old backupTriad;
- Get newJoinedNodes from backupTriad, that is, remove backupTriad and NotWorking from new backupTriad;
- Traversing newJoinedNodes, for each node, construct toBeSyncMap = Map
>>
- Return toBeSyncMap; This is the
Which IP needs to synchronize which things
;
private Map<String/*ip*/, Map<String/*datacenter*/, Map<String/*datainfoId*/, BackupTriad>>> getToBeSyncMap(ConsistentHash<DataNode> consistentHash) {
Map<String, Map<String, Map<String, BackupTriad>>> toBeSyncMap = new HashMap<>();
Map<String, List<DataNode>> triadCache = new HashMap<>();
ConsistentHash<DataNode> consistentHashOld = dataServerCache
.calculateOldConsistentHash(dataServerConfig.getLocalDataCenter());
}
Copy the code
6.2.3 getNewJoined
GetNewJoined is to find those that are not in the stored Triad or are in it but are not in working state.
public List<DataNode> getNewJoined(List<DataNode> newTriad, Set<String> notWorking) {
List<DataNode> list = new ArrayList<>();
for (DataNode node : newTriad) {
String ip = node.getIp();
if (!ipSetOfNode.contains(ip) || notWorking.contains(ip)) {
list.add(node);
}
}
return list;
}
Copy the code
6.2.4 BackupTriad
BackupTriad is used to backup the DataNode list corresponding to dataInfoId.
public class BackupTriad {
/** dataInfoId */
private String dataInfoId;
/**
* calculate current dataServer list Consistent hash to get dataInfoId belong node and backup node list
* @see ConsistentHash#ConsistentHash(int, java.util.Collection)
* @see com.alipay.sofa.registry.consistency.hash.ConsistentHash#getNUniqueNodesFor(java.lang.Object, int)
*/
private List<DataNode> triad;
private Set<String> ipSetOfNode = new HashSet<>();
/**
* constructor
* @param dataInfoId
* @param triad
*/
public BackupTriad(String dataInfoId, List<DataNode> triad) {
this.dataInfoId = dataInfoId;
this.triad = triad;
for(DataNode node : triad) { ipSetOfNode.add(node.getIp()); }}}Copy the code
The runtime is as follows:
backupTriad = {BackupTriad@1400} "BackupTriad {dataInfoId = 'TestDataInfoId, ipSetOfNode = [192.168.0.2, 192.168.0.1 192.168.0.3]}"
dataInfoId = "TestDataInfoId"
triad = {ArrayList@1399} size = 3
0 = {DataNode@1409} "DataNode = {IP 192.168.0.1}"
1 = {DataNode@1410} "DataNode = {IP 192.168.0.2}"
2 = {DataNode@1411} "DataNode = {IP 192.168.0.3}"
ipSetOfNode = {HashSet@1403} size = 3
0 = "192.168.0.2"
1 = "192.168.0.1"
2 = "192.168.0.3"
Copy the code
Where does 0x07 changeVersion come from
In the above code, will get a version from LocalDataServerChangeEvent, thus using this version for subsequent processing, at the same time also can give dataServerCache sets the version number.
LocalDataServerChangeEvent event = events.take();
long changeVersion = event.getVersion();
if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) {
//if local server is working, compare sync data
notifyToFetch(event, changeVersion);
} else {
dataServerCache.checkAndUpdateStatus(changeVersion);
//if local server is not working, notify others that i am newer
notifyOnline(changeVersion);
}
Copy the code
Now we wonder where this version came from when changes were made to Data Server. Let’s go back to the source. This is going backwards.
7.1 Version number and changes
7.1.1 DataServerCache
Since the dataServerCache setting version number is mentioned, we’ll go back to dataServerCache. As you can see, DataServerCache has two related variables: curVersion and DataServerChangeItem.
This is to get the corresponding data Center version number from newDataServerChangeItem, set to DataServerCache.
DataServerCache is defined as follows:
public class DataServerCache {
/** new input dataServer list and version */
private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem();
private AtomicLong curVersion = new AtomicLong(-1L);
public Long getDataCenterNewVersion(String dataCenter) {
synchronized (DataServerCache.class) {
Map<String, Long> versionMap = newDataServerChangeItem.getVersionMap();
if (versionMap.containsKey(dataCenter)) {
return versionMap.get(dataCenter);
} else {
return null; }}}}Copy the code
7.1.2 Settings and Usage
In DataServerCache, only addStatus controls curVersion assignments. In the external interface, only Synced and addNotWorkingServer call addStatus.
NewDataServerChangeItem is set in compareAndSet.
public Map<String, Set<String>> compareAndSet(DataServerChangeItem newItem, FromType fromType) {
if (!changedMap.isEmpty()) {
newDataServerChangeItem = newItem;
}
}
Copy the code
The logic is as follows:
+-----------------------------+
|[DataServerCache] |
| |
compareAndSet +-------------> DataServerChangeItem |
| |
| curVersion |
| ^ ^ |
| | | |
+-----------------------------+
| |
synced +----------------------+ |
|
addNotWorkingServer+-------------------+
Copy the code
7.1.3 Two Design points
There are two design points for DataServerCache:
- What is curVersion used for?
- What newDataServerChangeItem is used for;
It is now inferred that each Data Center Data Center has a version number that is used for all state control within it. In fact, versionMap can also be seen in the definition of DataServerChangeItem, which is controlled by version number.
DataServerChangeItem is defined as follows:
public class DataServerChangeItem {
/** datacenter -> Map<ip, DataNode> */
private Map<String, Map<String, DataNode>> serverMap;
/** datacenter -> version */
private Map<String, Long> versionMap;
}
Copy the code
Thus know:
- CurVersion indicates the latest version of the Data Center.
- NewDataServerChangeItem is the change data corresponding to the latest version number;
Now the question becomes,
- Where does the DataServerChangeItem come from?
- Where does curVersion come from?
We read the source code to know that it is obtained from Meta Server, and we will follow the process below.
7.2 the Data Server
7.2.1 Actively capture changes
We need to go over the process.
MetaServer broadcasts a data Server update to all data Servers, or maybe DataServer takes the initiative to periodically check MetaServer for updates.
However, the data Server actively sends GetNodesRequest to obtain the updated content.
Here to take the initiative to update, for example, you can see, can through metaServerService DataServer getDateServers from meta server access to DataServerChangeItem, Build a DataServerChangeEvent.
public class ConnectionRefreshTask extends AbstractTask {
@Autowired
private IMetaServerService metaServerService;
@Autowired
private EventCenter eventCenter;
@Override
public void handle(a) {
DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers();
if(dataServerChangeItem ! =null) {
eventCenter
.post(newDataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK)); }}}Copy the code
In DefaultMetaServiceImpl, you can see that the DataServerChangeItem is extracted from the NodeChangeResult retrieved from the Meta Server.
public class DefaultMetaServiceImpl implements IMetaServerService {
@Override
public DataServerChangeItem getDateServers(a) {
Map<String, Connection> connectionMap = metaServerConnectionFactory
.getConnections(dataServerConfig.getLocalDataCenter());
String leader = getLeader().getIp();
if (connectionMap.containsKey(leader)) {
Connection connection = connectionMap.get(leader);
if (connection.isFine()) {
try {
GetNodesRequest request = new GetNodesRequest(NodeType.DATA);
Object obj = metaNodeExchanger.request(new Request() {
@Override
public Object getRequestBody(a) {
return request;
}
@Override
public URL getRequestUrl(a) {
return new URL(connection.getRemoteIP(), connection.getRemotePort());
}
}).getResult();
if (obj instanceof NodeChangeResult) {
NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
Map<String, Long> versionMap = result.getDataCenterListVersions();
versionMap.put(result.getLocalDataCenter(), result.getVersion());
return new DataServerChangeItem(result.getNodes(), versionMap);
}
}
}
}
String newip = refreshLeader().getIp();
return null; }}Copy the code
The logic is as follows:
+ Data Server
|
|
| +------------------+
| | NodeChangeResult |
| +-------+----------+
| | +--------------------------+
| | |[DataServerCache] |
| | | |
| +---------------->compareAndSet------> DataServerChangeItem |
| DataServerChangeItem | |
| | curVersion |
| | ^ ^ |
| | | | |
| +--------------------------+
| | |
| synced +------------- |
| |
| addNotWorkingServer----------+
|
|
+
Copy the code
7.3 Meta server
7.3.1 Setting the Version number
Let’s come to Meta Server. As you can see, DataStoreService PUT, remove, and other functions call dataNodeRepository to set the version number through the timestamp when the data Server changes.
dataNodeRepository.setVersion(currentTimeMillis);
Copy the code
7.3.2 Extracting the Version Number
When meta Server receives the GetNodesRequest, a NodeChangeResult is generated.
DataStoreService calls dataNodeRepository to get the version number, which is set in NodeChangeResult.
public class DataStoreService implements StoreService<DataNode> {
@Override
public NodeChangeResult getNodeChangeResult(a) {
NodeChangeResult nodeChangeResult = new NodeChangeResult(NodeType.DATA);
try {
String localDataCenter = nodeConfig.getLocalDataCenter();
Map<String/*dataCenter*/, NodeRepository> dataNodeRepositoryMap = dataRepositoryService
.getNodeRepositories();
ConcurrentHashMap<String/*dataCenter*/, Map<String/*ipAddress*/, DataNode>> pushNodes = new ConcurrentHashMap<>();
Map<String/*dataCenter*/, Long> versionMap = new ConcurrentHashMap<>();
dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> {
// Get the version number here
if (localDataCenter.equalsIgnoreCase(dataCenter)) {
nodeChangeResult.setVersion(dataNodeRepository.getVersion());
}
versionMap.put(dataCenter, dataNodeRepository.getVersion());
Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap();
Map<String, DataNode> newMap = new ConcurrentHashMap<>();
dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
pushNodes.put(dataCenter, newMap);
});
nodeChangeResult.setNodes(pushNodes);
nodeChangeResult.setDataCenterListVersions(versionMap);
nodeChangeResult.setLocalDataCenter(localDataCenter);
}
/ / return
returnnodeChangeResult; }}Copy the code
Details are as follows:
Meta Server + Data Server | | getNodeChangeResult +-----------------+ | +------------------+ +-------------------------> | NodeChangeResult| +------>+ NodeChangeResult | | +-----------------+ | +-------+----------+ | | | +--------------------------+ | | | |[DataServerCache] | +--------+--------+ | | | | |DataStoreService | +-------------------+ | +---------------->compareAndSet------> DataServerChangeItem | +-----------------+ getVersion | | DataServerChangeItem | | | | | curVersion | | | | ^ ^ | | | | | | | v | +--------------------------+ +----------------------+ +-+-----------------+ | | | | DataRepositoryService+-------------> |dataNodeRepository | | synced +------------- | +----------------------+ +-------------------+ | | setVersion(currentTimeMillis) | addNotWorkingServer----------+ | | +Copy the code
As shown on the phone:
7.4 the Data Server
7.4.1 Get changes
Let’s go back to Data Server.
When the DataServer receives the NodeChangeResult, it extracts the DataServerChangeItem.
public class DefaultMetaServiceImpl implements IMetaServerService {
@Override
public DataServerChangeItem getDateServers(a) {... GetNodesRequest request =new GetNodesRequest(NodeType.DATA);
Object obj = metaNodeExchanger.request(new Request() {
......
}
}).getResult();
if (obj instanceof NodeChangeResult) {
NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
Map<String, Long> versionMap = result.getDataCenterListVersions();
versionMap.put(result.getLocalDataCenter(), result.getVersion());
return new DataServerChangeItem(result.getNodes(), versionMap);
}
}
}
}
}
}
Copy the code
And then will return to “actively seek changes” in the front section, send DataServerChangeEvent, then into LocalDataServerChangeEvent, is linked with our code.
0x08 Data Server Follow-up Procedure
We need to look at how dataserverCache. curVersion and newDataServerChangeItem are handled further.
8.1 newDataServerChangeItem
DataServerChangeEventHandler doHandle function in use:
for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) {
String dataCenter = changeEntry.getKey();
Set<String> ips = changeEntry.getValue();
Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
}
Copy the code
Call dataServerCache (newDataServerChangeItem);
public Long getDataCenterNewVersion(String dataCenter) {
synchronized (DataServerCache.class) {
Map<String, Long> versionMap = newDataServerChangeItem.getVersionMap();
if (versionMap.containsKey(dataCenter)) {
return versionMap.get(dataCenter);
} else {
return null; }}}Copy the code
Build LocalDataServerChangeEvent, put a local version localDataCenterversion newDataServerChangeItem version.
public LocalDataServerChangeEvent(Map<String, DataNode> localDataServerMap,
Set<String> newJoined, long version,
long localDataCenterversion) {
this.localDataServerMap = localDataServerMap;
this.newJoined = newJoined;
this.version = version;
this.localDataCenterversion = localDataCenterversion;
}
Copy the code
DataServerCache updates the data accordingly.
dataServerCache.updateItem(dataServerMapIn, event.getLocalDataCenterversion(),
dataServerConfig.getLocalDataCenter());
Copy the code
8.2 curVersion
With curVersion, we come to notifyToFetch and notifyOnline.
8.2.1 Sending the version number
Previously we just explained how to send a version number, namely:
- The online service node uses NotifyFetchDatumRequest to tell the new node, I have the data you need, come and get it.
- A newly added node uses a NotifyOnlineRequest to tell the other nodes that are already online that I am new and you can configure it accordingly.
Therefore, we conclude that after receiving the data Server change message from Meta Server, all data servers in the same data Center will inform each other of the upgrade version number.
-
NotifyOnline sends a NotifyOnlineRequest, and the NotifyOnlineHandler of other Data servers does the corresponding processing.
-
NotifyToFetch sends NotifyFetchDatumRequest, and notifyFetchDatumHandler of other Data servers does the corresponding processing.
8.2.2 Receiving the version number
Let’s take a look at what the new and online nodes of DataServer do after receiving the version number.
- NotifyFetchDatumHandler —- New node processing
This is a data pull request. When triggered, the Handler notifies the current DataServer node to compare versions. If the version in the request is higher than that in the cache of the current node, the data is synchronized to ensure that the data is up to date.
- NotifyOnlineHandler —- Online node processing
This is a DataServer notification Handler that is triggered when another node goes online, so that the current node stores new node information in the cache. Used to manage node status, whether it is INITIAL or WORKING.
Thus, both NotifyOnlineHandler and NotifyFetchDatumHandler determine whether to continue processing based on the curVersion stored in the local dataServerCache.
public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> {
@Autowired
private DataServerCache dataServerCache;
@Override
public Object doHandle(Channel channel, NotifyOnlineRequest request) {
long version = request.getVersion();
if (version >= dataServerCache.getCurVersion()) {
dataServerCache.addNotWorkingServer(version, request.getIp());
}
returnCommonResponse.buildSuccessResponse(); }}Copy the code
And the NotifyFetchDatumHandler calls sycned.
public class NotifyFetchDatumHandler extends AbstractServerHandler<NotifyFetchDatumRequest> {
private static final Logger LOGGER = LoggerFactory
.getLogger(NotifyFetchDatumHandler.class);
@Autowired
private DataServerCache dataServerCache;
@Autowired
private DataServerConnectionFactory dataServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private Exchange boltExchange;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DatumCache datumCache;
@Autowired
private LocalDataServerCleanHandler localDataServerCleanHandler;
@Override
public Object doHandle(Channel channel, NotifyFetchDatumRequest request) {
ParaCheckUtil.checkNotBlank(request.getIp(), "ip");
//receive other data NotifyFetchDatumRequest,must delay clean datum task until fetch all datum
localDataServerCleanHandler.reset();
Map<String, Map<String, Long>> versionMap = request.getDataVersionMap();
long version = request.getChangeVersion();
String ip = request.getIp();
if (version >= dataServerCache.getCurVersion()) {
if (versionMap.isEmpty()) {
dataServerCache.synced(version, ip);
} else {
ExecutorFactory.getCommonExecutor().execute(() -> {
for (Entry<String, Map<String, Long>> dataCenterEntry : versionMap.entrySet()) {
String dataCenter = dataCenterEntry.getKey();
Map<String, Long> map = dataCenterEntry.getValue();
for (Entry<String, Long> dataInfoEntry : map.entrySet()) {
String dataInfoId = dataInfoEntry.getKey();
Datum datum = datumCache.get(dataCenter, dataInfoId);
if(datum ! =null) {
long inVersion = dataInfoEntry.getValue();
long currentVersion = datum.getVersion();
if (currentVersion > inVersion) {
continue;
} else if (datum.getVersion() == dataInfoEntry.getValue()) {
//if version same,maybe remove publisher all by LocalDataServerCleanHandler,so must fetch from other node
if(! datum.getPubMap().isEmpty()) {continue; } } } fetchDatum(ip, dataCenter, dataInfoId); } } dataServerCache.synced(version, ip); }); }}returnCommonResponse.buildSuccessResponse(); }}Copy the code
Thus, the present is as follows:
+ | Meta Server | Data Server | | getNodeChangeResult +-----------------+ | +------------------+ +-------------------------> | NodeChangeResult| +------>+ NodeChangeResult | | +-----------------+ | +-------+----------+ | | | +--------------------------+ | | | |[DataServerCache] | +--------+--------+ | | | | |DataStoreService | +-------------------+ | +---------------->compareAndSet+-----> DataServerChangeItem | +-----------------+ getVersion | | DataServerChangeItem | | | | | curVersion | | | | ^ ^ | | | | | | | v | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + - + - + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | ^ ^ | DataRepositoryService+-------------> |dataNodeRepository | | synced +------------------------------------+ | | | getCurVersion +----------------------+ +-------------------+ | | | | setVersion(currentTimeMillis) | addNotWorkingSer^er+---------------------------------+ | | | +-------------------------------------------+ | | | getCurVersion | | | | | +-------------+---------+ +---------------------+----+ | | NotifyOnlineHandler | | NotifyFetchDatumHandler | | +-------------+---------+ +---------------+----------+ | ^ In Exist Server ^ In New Server | | | | | | +-----------------------------------------------------------------------------+ | | | | +-------+------------+ +---------+-----------+ | New Data Server | | Exist Data Server | +--------------------+ +---------------------+Copy the code
The mobile phone is as follows:
At this point, the version number process is completely combed out.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
How does ant Financial Service registry realize the smooth scaling of DataServer
Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization
The service registry Session storage policy | SOFARegistry parsing
Introduction to the Registry – SOFARegistry architecture for Massive Data
Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing
Ant Financial open source communication framework SOFABolt analysis of connection management analysis
Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework
SOFABolt protocol framework analysis of Ant Financial open source communication framework
Ant gold uniform service registry data consistency analysis | SOFARegistry parsing