0 x00 the
SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.
This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.
This is the fifteenth article, which introduces renewal and elimination.
0x01 Business Category
Renewal and culling are important features for service registration and discovery, such as:
1.1 Failure elimination
Sometimes, our service instance does not necessarily go offline properly. The service may not work properly due to memory overflow, network failure, etc., and the service registry does not receive the “service offline” request.
To remove these instances that cannot provide services from the service list. When the Server is started, it creates a scheduled task. By default, services in the current list that do not renew due to timeout (90s by default) are deleted from the list every 60 seconds.
1.2 Service Renewal
After registering the service, the service provider maintains a heartbeat that continuously tells the Server, “I am alive.” To prevent Server’s Cull task from removing the service instance from the list of services. We call this operation Renew the service.
0x02 DatumLeaseManager
On the Data Server side, DatumLeaseManager implements “failure elimination” and “service renewal” functions.
2.1 define
The main variables of DatumLeaseManager are as follows:
-
ConnectIdRenewTimestampMap will maintain inside each service sends a heartbeat time recently, had it have similar data structure;
-
LocksForConnectId: To operate only one thread at a time; Lock for connectId: every connectId allows only one task to be created;
Specific definitions are as follows:
public class DatumLeaseManager implements AfterWorkingProcess {
/** record the latest heartbeat time for each connectId, format: connectId -> lastRenewTimestamp */
private final Map<String, Long> connectIdRenewTimestampMap = new ConcurrentHashMap<>();
/** lock for connectId , format: connectId -> true */
private ConcurrentHashMap<String, Boolean> locksForConnectId = new ConcurrentHashMap();
private volatile boolean serverWorking = false;
private volatile boolean renewEnable = true;
private AsyncHashedWheelTimer datumAsyncHashedWheelTimer;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DisconnectEventHandler disconnectEventHandler;
@Autowired
private DatumCache datumCache;
@Autowired
private DataNodeStatus dataNodeStatus;
private ScheduledThreadPoolExecutor executorForHeartbeatLess;
privateScheduledFuture<? > futureForHeartbeatLess; }Copy the code
2.2 the contract
2.2.1 Data Structure
In the DatumLeaseManager, there are mainly the following data structures for renewal.
private ConcurrentHashMap<String, Boolean> locksForConnectId = new ConcurrentHashMap();
private AsyncHashedWheelTimer datumAsyncHashedWheelTimer;
Copy the code
2.2.2 call
Review is called in the following modules, all of which are AbstractServerHandler.
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest>
public class DatumSnapshotHandler extends AbstractServerHandler<DatumSnapshotRequest>
public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements AfterWorkingProcess
public class UnPublishDataHandler extends AbstractServerHandler<UnPublishDataRequest>
Copy the code
Then the contract
The DatumLeaseManager here records the latest timestamp and then starts scheduleEvictTask.
public void renew(String connectId) {
// record the renew timestamp
connectIdRenewTimestampMap.put(connectId, System.currentTimeMillis());
// try to trigger evict task
scheduleEvictTask(connectId, 0);
}
Copy the code
Details are as follows:
- Returns if the current ConnectionId is locked;
- Start time wheel, add a timing operation, if the time is up, then:
- Release the lock for the current ConnectionId.
- Gets the last renewal time corresponding to the current ConnectionId. If it does not exist, the current ConnectionId has been removed.
- If the current state is not renewable, set the next scheduled operation time. Because If in a non-working state, cannot clean up because the renew request cannot be received at this time;
- If the last renewal has expired, evict is used
- If not, scheduleEvictTask(connectId, nextDelaySec) is called; Set the next operation
The specific code is as follows:
/** * trigger evict task: if connectId expired, create ClientDisconnectEvent to cleanup datums bind to the connectId * PS: every connectId allows only one task to be created */
private void scheduleEvictTask(String connectId, long delaySec) {
delaySec = (delaySec <= 0)? dataServerConfig.getDatumTimeToLiveSec() : delaySec;// lock for connectId: every connectId allows only one task to be created
Boolean ifAbsent = locksForConnectId.putIfAbsent(connectId, true);
if(ifAbsent ! =null) {
return;
}
datumAsyncHashedWheelTimer.newTimeout(_timeout -> {
boolean continued = true;
long nextDelaySec = 0;
try {
// release lock
locksForConnectId.remove(connectId);
// get lastRenewTime of this connectId
Long lastRenewTime = connectIdRenewTimestampMap.get(connectId);
if (lastRenewTime == null) {
// connectId is already clientOff
return;
}
/* * 1. lastRenewTime expires, then: * - build ClientOffEvent and hand it to DataChangeEventCenter. * - It will not be scheduled next time, so terminated. * 2. lastRenewTime not expires, then: * - trigger the next schedule */
boolean isExpired =
System.currentTimeMillis() - lastRenewTime > dataServerConfig.getDatumTimeToLiveSec() * 1000L;
if(! isRenewEnable()) { nextDelaySec = dataServerConfig.getDatumTimeToLiveSec(); }else if (isExpired) {
int ownPubSize = getOwnPubSize(connectId);
if (ownPubSize > 0) {
evict(connectId);
}
connectIdRenewTimestampMap.remove(connectId, lastRenewTime);
continued = false;
} else {
nextDelaySec = dataServerConfig.getDatumTimeToLiveSec()
- (System.currentTimeMillis() - lastRenewTime) / 1000L;
nextDelaySec = nextDelaySec <= 0 ? 1: nextDelaySec; }}if (continued) {
scheduleEvictTask(connectId, nextDelaySec);
}
}, delaySec, TimeUnit.SECONDS);
}
Copy the code
2.2.4 graphic
The details are shown in the following figure
+------------------+ +-------------------------------------------+ |PublishDataHandler| | DatumLeaseManager | +--------+---------+ | | | | newTimeout | | | +----------------------> | doHandle | ^ + | | | | | | | renew | +-----------+--------------+ | | | +--------------> | | AsyncHashedWheelTimer | | | | | +-----+-----+--------------+ | | | | | ^ | | | | | | scheduleEvictTask | | | | evict | + v | | | | <----------------------+ | | +-------------------------------------------+ | | | | | | | | v vCopy the code
Or as shown below:
+------------------+ +-------------------+ +------------------------+
|PublishDataHandler| | DatumLeaseManager | | AsyncHashedWheelTimer |
+--------+---------+ +--------+----------+ +-----------+------------+
| | new |
doHandle +------------------------> |
| renew | |
+-------------------> | |
| | |
| | |
| scheduleEvictTask |
| | |
| | newTimeout |
| +----------> +------------------------> |
| | | |
| | | |
| | | |
| | | No +
| | | <---------------+ if (ownPubSize > 0)
| | | +
| | v |
| +--+ scheduleEvictTask | Yes
| + v
| | evict
| | |
v v v
Copy the code
2.3 drive
2.3.1 Data Structure
In the DatumLeaseManager, there are mainly the following data structures for renewal.
private ScheduledThreadPoolExecutor executorForHeartbeatLess;
privateScheduledFuture<? > futureForHeartbeatLess;Copy the code
There are two invocation paths, so that when the data changes, it will see if it can be expelled:
- Called at startup;
- Explicit call;
2.3.2 Explicit Call
LocalDataServerChangeEventHandler class, called the datumLeaseManager. Reset (), then call the evict.
@Override
public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
isChanged.set(true);
// Better change to Listener pattern
localDataServerCleanHandler.reset();
datumLeaseManager.reset();
events.offer(localDataServerChangeEvent);
}
Copy the code
DatumLeaseManager reset call the scheduleEvictTaskForHeartbeatLess launched the expulsion of the thread.
public synchronized void reset(a) {
if(futureForHeartbeatLess ! =null) {
futureForHeartbeatLess.cancel(false);
}
scheduleEvictTaskForHeartbeatLess();
}
Copy the code
2.3.3 Starting the Invocation
When started, the expulsion thread is started.
@PostConstruct
public void init(a) {... executorForHeartbeatLess =new ScheduledThreadPoolExecutor(1, threadFactoryBuilder
.setNameFormat("Registry-DatumLeaseManager-ExecutorForHeartbeatLess").build());
scheduleEvictTaskForHeartbeatLess();
}
Copy the code
2.3.4 deportation
Concrete is expelled by starting a timer thread EvictTaskForHeartbeatLess to complete.
private void scheduleEvictTaskForHeartbeatLess(a) {
futureForHeartbeatLess = executorForHeartbeatLess.scheduleWithFixedDelay(
new EvictTaskForHeartbeatLess(), dataServerConfig.getDatumTimeToLiveSec(),
dataServerConfig.getDatumTimeToLiveSec(), TimeUnit.SECONDS);
}
Copy the code
When the time end arrives, all current Connectionids are retrieved from datumCache, and the Connectionids are iterated to see if the last timestamp expired, and expelled if it did.
/** * evict own connectIds with heartbeat less */
private class EvictTaskForHeartbeatLess implements Runnable {
@Override
public void run(a) {
// If in a non-working state, cannot clean up because the renew request cannot be received at this time.
if(! isRenewEnable()) {return;
}
Set<String> allConnectIds = datumCache.getAllConnectIds();
for (String connectId : allConnectIds) {
Long timestamp = connectIdRenewTimestampMap.get(connectId);
// no heartbeat
if (timestamp == null) {
int ownPubSize = getOwnPubSize(connectId);
if (ownPubSize > 0) {
evict(connectId);
}
}
}
}
}
Copy the code
This call
private void evict(String connectId) {
disconnectEventHandler.receive(new ClientDisconnectEvent(connectId, System
.currentTimeMillis(), 0));
}
Copy the code
The details are as follows:
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | |for (allConnectIds) | |
| | v | |
| | | |
| | connectIdRenewTimestampMap | |
| | | |
| | | | |
| | | no heartbeat | |
| | v | |
| | | |
| | evict | |
| | | |
| +----------------------------------------------+ |
+--------------------------------------------------+
Copy the code
2.3.5 Eviction Processing Services
2.3.5.1 Forwarding the expulsion Message
The expulsion message needs to be forwarded, which corresponds to disConnecteventhandler. receive, event_queue.add (event);
public class DisconnectEventHandler implements InitializingBean.AfterWorkingProcess {
/** * a DelayQueue that contains client disconnect events */
private final DelayQueue<DisconnectEvent> EVENT_QUEUE = new DelayQueue<>();
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataNodeStatus dataNodeStatus;
private static final BlockingQueue<DisconnectEvent> noWorkQueue = new LinkedBlockingQueue<>();
public void receive(DisconnectEvent event) {
if(dataNodeStatus.getStatus() ! = LocalServerStatusEnum.WORKING) { noWorkQueue.add(event);return; } EVENT_QUEUE.add(event); }}Copy the code
AfterPropertiesSet starts a Thread that loops to retrieve messages from the EVENT_QUEUE and processes them:
- Remove from sessionServerConnectionFactory Connection;
- Send a ClientChangeEvent notification to dataChangeEventCenter
Details are as follows:
@Override
public void afterPropertiesSet(a) {
Executor executor = ExecutorFactory
.newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName());
executor.execute(() -> {
while (true) {
try {
DisconnectEvent disconnectEvent = EVENT_QUEUE.take();
if (disconnectEvent.getType() == DisconnectTypeEnum.SESSION_SERVER) {
SessionServerDisconnectEvent event = (SessionServerDisconnectEvent) disconnectEvent;
String processId = event.getProcessId();
//check processId confirm remove,and not be registered again when delay time
String sessionServerHost = event.getSessionServerHost();
if (sessionServerConnectionFactory
.removeProcessIfMatch(processId,sessionServerHost)) {
Set<String> connectIds = sessionServerConnectionFactory
.removeConnectIds(processId);
if(connectIds ! =null && !connectIds.isEmpty()) {
for(String connectId : connectIds) { unPub(connectId, event.getRegisterTimestamp()); }}}}else{ ClientDisconnectEvent event = (ClientDisconnectEvent) disconnectEvent; unPub(event.getConnectId(), event.getRegisterTimestamp()); }}}}); }/ * * * *@param connectId
* @param registerTimestamp
*/
private void unPub(String connectId, long registerTimestamp) {
dataChangeEventCenter.onChange(new ClientChangeEvent(connectId, dataServerConfig
.getLocalDataCenter(), registerTimestamp));
}
Copy the code
As shown in the figure below
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | |for (allConnectIds) | | +------------------------+
| | v | | | |
| | | | | DisconnectEventHandler |
| | connectIdRenewTimestampMap | | | |
| | | | | +-------------+ |
| | | | | | | noWorkQueue | |
| | | no heartbeat | | | +-------------+ |
| | v | | receive | |
| | | | | +--------------+ |
| | evict +---------------------------------> | EVENT_QUEUE | |
| | | | | +--------------+ |
| +----------------------------------------------+ | +------------------------+
+--------------------------------------------------+
Copy the code
2.3.5.1 DataChangeEventCenter forward
The logic then goes to the DataChangeEventCenter, which also serves as a forwarding function.
public class DataChangeEventCenter {
/** * queues of DataChangeEvent */
private DataChangeEventQueue[] dataChangeEventQueues;
/**
* receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue
*
* @param publisher
* @param dataCenter
*/
public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if(publisher.getPublishType() ! = PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(newDataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); }}}Copy the code
2.3.5.2 DataChangeEventQueue processing
The DataChangeEventQueue does this by calling addTempChangeData and handleDatum to process the data that needs to be expelled.
When the event was removed, according to DataChangeScopeEnum. The DATUM is different, do different processing.
- If is DataChangeScopeEnum DATUM, the judge dataChangeEvent. GetSourceType ();
- If DataSourceTypeEnum.PUB_TEMP, addTempChangeData adds ChangeData to CHANGE_QUEUE.
- If not, handleDatum;
- If is DataChangeScopeEnum. The CLIENT is handleClientOff ((ClientChangeEvent) event);
- If it is DataChangeScopeEnum. The SNAPSHOT, handleSnapshot ((DatumSnapshotEvent) event);
Ant Financial SOFARegistry’s Message Bus asynchronous processing
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | |for (allConnectIds) | | +------------------------+
| | v | | | |
| | | | | DisconnectEventHandler |
| | connectIdRenewTimestampMap | | | |
| | | | | +-------------+ |
| | | | | | | noWorkQueue | |
| | | no heartbeat | | | +-------------+ |
| | v | | receive | |
| | | | | +--------------+ |
| | evict +---------------------------------> | EVENT_QUEUE | |
| | | | | +--------------+ |
| +----------------------------------------------+ | +------------------------+
+--------------------------------------------------+ |
|
+----------------------+ | onChange
| DataChangeEventQueue | v
| | +--------+------------------+
| | | DataChangeEventCenter |
| +------------+ | | |
| | eventQueue | | add DataChangeEvent | |
| +------------+ | | +-----------------------+ |
| | <-----------------------------+ | | dataChangeEventQueues | |
| addTempChangeData | | +-----------------------+ |
| | +---------------------------+
| handleDatum |
| |
+----------------------+
Copy the code
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
How does ant Financial Service registry realize the smooth scaling of DataServer
Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization
The service registry Session storage policy | SOFARegistry parsing
Introduction to the Registry – SOFARegistry architecture for Massive Data
Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing
Ant Financial open source communication framework SOFABolt analysis of connection management analysis
Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework
SOFABolt protocol framework analysis of Ant Financial open source communication framework
Ant gold uniform service registry data consistency analysis | SOFARegistry parsing
Ant communication framework practice
Sofa – Bolt remote call
Sofa – bolt to study
SOFABolt Design Summary – Elegant and simple design approach
SofaBolt source code analysis – Service startup to message processing
SOFABolt source code analysis
SOFABolt source code analysis 9-userProcessor custom processor design
SOFARegistry introduction
SOFABolt source code analysis of the design of the 13-Connection event processing mechanism