0 x00 the
SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.
This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.
This article, part 15, analyzes how to implement the ChangeNotifier to inform related modules: Hi, new data changes are coming, brothers, let’s go.
0x01 Business Category
1.1 an overview of the
When a publisher goes online, the publishDataProcessor or unPublishDataHandler is triggered, which adds a data change event to the dataChangeEventCenter. Used to asynchronously notify the event Change center of changes to data. After receiving the event, the event Change center adds the event to the queue.
-
In this case, dataChangeEventCenter will asynchronously process the upstream and downstream data according to different event types. That is, change the event change information into ChangeNotifier, then into Operator, and put it in AbstractAcceptorStore.
-
At the same time, the DataChangeHandler will publish the event change information through the ChangeNotifier, notifying other nodes to synchronize data.
Due to space constraints, the previous part of the ChangeNotifier section is only skipped, this article will explain the ChangeNotifier event change notification in detail. Here we will link the process together again, which will refer to some of the previous articles.
I’m going to show you what ChangeNotifier does.
+--------------------+
| PublishDataHandler |
+--------+-----------+
|
|
| publisher
|
v
+---------+------------+
|DataChangeEventCenter |
+---------+------------+
|
|
| ChangeData
v
+---------+------------+
| DataChangeEventQueue |
+---------+------------+
|
|
| ChangeData
v
+-------+----------+
| DataChangeHandler|
+-------+----------+
|
|
| ChangeData
v
+------+--------+ +------------+
| ChangeNotifier| +--------> | datumCache |
+------+--------+ +------------+
|
|
v
+---+------+
| notifier |
+---+------+
|
v
+-----------+---------------+
| |
v v
+----+----------------+ +------+----------+
|SessionServerNotifier| | BackUpNotifier |
+----+----------------+ +------+----------+
| |
| |
| |
| v
+--v------------+ +------+----------------+
| sessionServer | | AbstractAcceptorStore |
+---------------+ +-----------------------+
Copy the code
1.2 Data Changes
The data changes in two directions
-
Data server node changes;
-
The change of data, namely the change of Publisher and Scriber;
ChangeNotifier is responsible for notifying related modules of Publisher and Scriber changes. Change notifications are a form of decoupling.
0x02 Data structure
We first need to look at the data structure of the notification.
2.1 Interface Definition
IDataChangeNotifier is the notification interface definition:
public interface IDataChangeNotifier {
Set<DataSourceTypeEnum> getSuitableSource(a);
/ * * * *@param datum
* @param lastVersion
*/
void notify(Datum datum, Long lastVersion);
}
Copy the code
2.2 derived class
IDataChangeNotifier has four derived classes, which correspond to the four possible changes of specific data. The purpose can be roughly determined from the name.
public class BackUpNotifier implements IDataChangeNotifier
public class SessionServerNotifier implements IDataChangeNotifier
public class SnapshotBackUpNotifier implements IDataChangeNotifier
public class TempPublisherNotifier implements IDataChangeNotifier
Copy the code
2.3 the Bean
The corresponding Bean is as follows:
@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers(a) {
List<IDataChangeNotifier> list = new ArrayList<>();
list.add(sessionServerNotifier());
list.add(tempPublisherNotifier());
list.add(backUpNotifier());
return list;
}
Copy the code
0 x03 process
Let’s go over the process from the beginning.
3.1 Adding Messages
When a publisher goes online, the publishDataProcessor or unPublishDataHandler is triggered, which adds a data change event to the dataChangeEventCenter. Used to asynchronously notify the event Change center of changes to data. After receiving the event, the event Change center adds the event to the queue.
Here’s how it works at DataServer:
3.1.1 PublishDataHandler
PublishDataHandler responds to a PublishDataRequest. When Publisher is available, put a message into the DataChangeEventCenter. Call below to place the message
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
Copy the code
The specific code is as follows:
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
@Autowired
private ForwardService forwardService;
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DatumLeaseManager datumLeaseManager;
@Autowired
private ThreadPoolExecutor publishProcessorExecutor;
@Override
public Object doHandle(Channel channel, PublishDataRequest request) {
Publisher publisher = Publisher.internPublisher(request.getPublisher());
if (forwardService.needForward()) {
CommonResponse response = new CommonResponse();
response.setSuccess(false);
response.setMessage("Request refused, Server status is not working");
return response;
}
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
if(publisher.getPublishType() ! = PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId);// record the renew timestamp
datumLeaseManager.renew(connectId);
}
returnCommonResponse.buildSuccessResponse(); }}Copy the code
The specific logic is as follows:
+--------------------+
| PublishDataHandler |
+--------+-----------+
|
|
| publisher
|
v
+---------+------------+
|DataChangeEventCenter |
+---------+------------+
Copy the code
3.1.2 DataChangeEventCenter
At the heart of the DataChangeEventCenter is an array of DataChangeEventQueue,
The datachangeEventCenter. onChange function first retrieves the Hash value from Publisher’s DataInfoId to determine which queue to put DataChangeEvent messages in. That’s calling the onChange function on this queue.
public class DataChangeEventCenter {
/** * queues of DataChangeEvent */
private DataChangeEventQueue[] dataChangeEventQueues;
@Autowired
private DatumCache datumCache;
@PostConstruct
public void init(a) {
if (isInited.compareAndSet(false.true)) {
queueCount = dataServerConfig.getQueueCount();
dataChangeEventQueues = new DataChangeEventQueue[queueCount];
for (int idx = 0; idx < queueCount; idx++) {
dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this, datumCache); dataChangeEventQueues[idx].start(); }}}/**
* receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue
*
* @param publisher
* @param dataCenter
*/
public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if(publisher.getPublishType() ! = PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(newDataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); }}}Copy the code
The main data members of DataChangeEventQueue are as follows:
public class DataChangeEventQueue {
/** * a block queue that stores all data change events */
private final BlockingQueue<IDataChangeEvent> eventQueue;
private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();
private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue();
private DataChangeEventCenter dataChangeEventCenter;
private DatumCache datumCache;
}
Copy the code
The execution engine is a thread that blocks on top of BlockingQueue eventQueue, and when there is a message, it pulls it out and does different processing for the message type.
public void start(a) {
Executor executor = ExecutorFactory
.newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
executor.execute(() -> {
while (true) {
try {
IDataChangeEvent event = eventQueue.take();
DataChangeScopeEnum scope = event.getScope();
if (scope == DataChangeScopeEnum.DATUM) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
//Temporary push data will be notify as soon as,and not merge to normal pub data;
if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
dataChangeEvent.getSourceType());
} else{ handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum()); }}else if (scope == DataChangeScopeEnum.CLIENT) {
handleClientOff((ClientChangeEvent) event);
} else if(scope == DataChangeScopeEnum.SNAPSHOT) { handleSnapshot((DatumSnapshotEvent) event); }}}}); }Copy the code
For Publisher message types, the handleDatum function does different processing depending on whether changeType is COVER or MERGE.
In this step, ChangeData is also put into change_queue.put (ChangeData);
private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {
lock.lock();
try {
//get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType); Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
changeData.setDatum(targetDatum);
} else {
Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();
for (Publisher pub : targetPubMap.values()) {
String registerId = pub.getRegisterId();
Publisher cachePub = cachePubMap.get(registerId);
if(cachePub ! =null) {
// if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
// that pub is not the newest data, should be ignored
if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
continue;
}
// if pub and cachePub both are publisher, and sourceAddress of both are equal,
// and version of cachePub is greater than version of pub, should be ignored
if(! (pubinstanceofUnPublisher) && ! (cachePubinstanceof UnPublisher)
&& pub.getSourceAddress().equals(cachePub.getSourceAddress())
&& cachePub.getVersion() > pub.getVersion()) {
continue; } } cachePubMap.put(registerId, pub); cacheDatum.setVersion(targetDatum.getVersion()); }}}finally{ lock.unlock(); }}Copy the code
The specific logic is as follows:
+--------------------+
| PublishDataHandler |
+--------+-----------+
|
|
| publisher
|
v
+---------+------------+
|DataChangeEventCenter |
+---------+------------+
|
|
| ChangeData
v
+---------+------------+
| DataChangeEventQueue |
+---------+------------+
Copy the code
3.2 Consuming messages & sending notifications
The DataChangeHandler will notify each DataChangeEventQueue for consumption.
public class DataChangeHandler {
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DatumCache datumCache;
@Resource
private List<IDataChangeNotifier> dataChangeNotifiers;
@PostConstruct
public void start(a) {
DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
int queueCount = queues.length;
Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
Executor notifyExecutor = ExecutorFactory
.newFixedThreadPool(dataServerConfig.getQueueCount() * 5.this.getClass().getSimpleName());
for (int idx = 0; idx < queueCount; idx++) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
executor.execute(() -> {
while (true) {
try {
final ChangeData changeData = dataChangeEventQueue.take();
notifyExecutor.execute(newChangeNotifier(changeData, name)); }}}); }}}Copy the code
3.2.1 DataChangeHandler
DataChangeHandler periodically extracts messages from the DataChangeEventCenter and processes them.
3.2.2 the class definition
public class DataChangeHandler {
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DatumCache datumCache;
@Resource
private List<IDataChangeNotifier> dataChangeNotifiers;
}
Copy the code
3.2.3 Execution Engine
Here is a two-tier thread model.
-
executor = ExecutorFactory.newFixedThreadPool(queueCount)
-
notifyExecutor= ExecutorFactory.newFixedThreadPool(dataServerConfig.getQueueCount() * 5)
You can think of executors as control threads and NotifierExecutors as worker threads, which are five times as many as control threads.
- The DataChangeHandler will iterate through all datachAngeEventQueues in the DataChangeEventCenter,
- One thread of control for each dataChangeEventQueue call executor,
- In this thread of control, ChangeData can be retrieved from the DataChangeEventQueue, and for each ChangeData, a worker thread of notifyExecutor is called, generating a ChangeNotifier for processing.
@PostConstruct
public void start(a) {
DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
int queueCount = queues.length;
Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
Executor notifyExecutor = ExecutorFactory
.newFixedThreadPool(dataServerConfig.getQueueCount() * 5.this.getClass().getSimpleName());
for (int idx = 0; idx < queueCount; idx++) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
executor.execute(() -> {
while (true) {
final ChangeData changeData = dataChangeEventQueue.take();
notifyExecutor.execute(newChangeNotifier(changeData, name)); }}); }}Copy the code
3.2.4 Service Execution
For ChangeData, a ChangeNotifier is generated for processing. The event change information is published through the ChangeNotifier to inform other nodes to synchronize data.
In ChangeNotifier, the type of changeData is determined and processed differently.
- If it is SnapshotData, then:
- Generate SnapshotData;
- Call datumCache.putSnapshot for storage;
- Call notify to notify;
- If it is any other type, then:
- For pub or unPub merge, datum.updateversion () is required;
- If PUB_TEMP, notifyTempPub(datum, sourceType, changeType);
- If the version is updated, notify(datum, sourceType, lastVersion);
Details are as follows:
private class ChangeNotifier implements Runnable {
private ChangeData changeData;
private String name;
@Override
public void run(a) {
if (changeData instanceof SnapshotData) {
......
} else {
Datum datum = changeData.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
DataSourceTypeEnum sourceType = changeData.getSourceType();
DataChangeTypeEnum changeType = changeData.getChangeType();
if(changeType == DataChangeTypeEnum.MERGE && sourceType ! = DataSourceTypeEnum.BACKUP && sourceType ! = DataSourceTypeEnum.SYNC) {//update version for pub or unPub merge to cache
//if the version product before merge to cache,it may be cause small version override big one
datum.updateVersion();
}
long version = datum.getVersion();
try {
if (sourceType == DataSourceTypeEnum.CLEAN) {
if(datumCache.cleanDatum(dataCenter, dataInfoId)) { ...... }}else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
notifyTempPub(datum, sourceType, changeType);
} else {
MergeResult mergeResult = datumCache.putDatum(changeType, datum);
Long lastVersion = mergeResult.getLastVersion();
if(lastVersion ! =null
&& lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
return;
}
//lastVersion null means first add datum
if (lastVersion == null|| version ! = lastVersion) {if (mergeResult.isChangeFlag()) {
notify(datum, sourceType, lastVersion);
}
}
}
}
}
}
}
Copy the code
The specific logic is as follows:
+--------------------+
| PublishDataHandler |
+--------+-----------+
|
|
| publisher
|
v
+---------+------------+
|DataChangeEventCenter |
+---------+------------+
|
|
| ChangeData
v
+---------+------------+
| DataChangeEventQueue |
+---------+------------+
|
|
| ChangeData
v
+-------+----------+
| DataChangeHandler|
+-------+----------+
|
|
| ChangeData
v
+------+--------+ +------------+
| ChangeNotifier| +--------> | datumCache |
+------+--------+ +------------+
Copy the code
3.2.5 notice
The notify function iterates through the dataChangeNotifiers to find notifiers that support the Datum’s corresponding SourceType.
Exactly how and which functions are supported is set up by getSuitableSource.
private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
for (IDataChangeNotifier notifier : dataChangeNotifiers) {
if(notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); }}}Copy the code
The corresponding Bean is:
@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers(a) {
List<IDataChangeNotifier> list = new ArrayList<>();
list.add(sessionServerNotifier());
list.add(tempPublisherNotifier());
list.add(backUpNotifier());
return list;
}
Copy the code
3.2.6 BackUpNotifier synchronization
Is called syncDataService. AppendOperator notify, in fact, the Datum into Operator, endures AbstractAcceptorStore.
public class BackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public Set<DataSourceTypeEnum> getSuitableSource(a) {
Set<DataSourceTypeEnum> set = new HashSet<>();
set.add(DataSourceTypeEnum.PUB);
return set;
}
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(newOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); }}Copy the code
3.2.7 SessionServerNotifier Notifying data changes
SessionServerNotifier is much more complicated.
public class SessionServerNotifier implements IDataChangeNotifier {
private AsyncHashedWheelTimer asyncHashedWheelTimer;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private Exchange boltExchange;
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DatumCache datumCache;
@Override
public Set<DataSourceTypeEnum> getSuitableSource(a) {
Set<DataSourceTypeEnum> set = new HashSet<>();
set.add(DataSourceTypeEnum.PUB);
set.add(DataSourceTypeEnum.SYNC);
set.add(DataSourceTypeEnum.SNAPSHOT);
returnset; }}Copy the code
3.2.7.1 time round
Set up a 500 millisecond time wheel.
@PostConstruct
public void init(a) {
ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
threadFactoryBuilder.setDaemon(true);
asyncHashedWheelTimer = new AsyncHashedWheelTimer(threadFactoryBuilder.setNameFormat(
"Registry-SessionServerNotifier-WheelTimer").build(), 500, TimeUnit.MILLISECONDS, 1024,
dataServerConfig.getSessionServerNotifierRetryExecutorThreadSize(),
dataServerConfig.getSessionServerNotifierRetryExecutorQueueSize(), threadFactoryBuilder
.setNameFormat("Registry-SessionServerNotifier-WheelExecutor-%d").build(),
new TaskFailedCallback() {
@Override
public void executionRejected(Throwable e) {
LOGGER.error("executionRejected: " + e.getMessage(), e);
}
@Override
public void executionFailed(Throwable e) {
LOGGER.error("executionFailed: "+ e.getMessage(), e); }}); }Copy the code
From the business perspective, when there is news about Publisher,
The notify function of the DataChangeHandler iterates through the dataChangeNotifiers to find notifiers that support the Datum’s corresponding SourceType.
private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
for (IDataChangeNotifier notifier : dataChangeNotifiers) {
if(notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); }}}Copy the code
The notify function in SessionServerNotifier iterates through all connections currently cached and notifies each one.
@Override
public void notify(Datum datum, Long lastVersion) {
DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(),
datum.getDataCenter(), datum.getVersion());
List<Connection> connections = sessionServerConnectionFactory.getSessionConnections();
for (Connection connection : connections) {
doNotify(newNotifyCallback(connection, request)); }}Copy the code
Specific notification function:
private void doNotify(NotifyCallback notifyCallback) {
Connection connection = notifyCallback.connection;
DataChangeRequest request = notifyCallback.request;
try {
//check connection active
if(! connection.isFine()) {return;
}
Server sessionServer = boltExchange.getServer(dataServerConfig.getPort());
sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()),
request, notifyCallback, dataServerConfig.getRpcTimeout());
} catch(Exception e) { onFailed(notifyCallback); }}Copy the code
The time wheel is used in retries of failed calls.
When the maximum number of failed retries is not reached, periodic retries are performed.
private void onFailed(NotifyCallback notifyCallback) {
DataChangeRequest request = notifyCallback.request;
Connection connection = notifyCallback.connection;
notifyCallback.retryTimes++;
//check version, if it's fall behind, stop retry
long _currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
if(request.getVersion() ! = _currentVersion) {return;
}
if (notifyCallback.retryTimes <= dataServerConfig.getNotifySessionRetryTimes()) {
this.asyncHashedWheelTimer.newTimeout(timeout -> {
//check version, if it's fall behind, stop retry
long currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
if(request.getVersion() == currentVersion) { doNotify(notifyCallback); } }, getDelayTimeForRetry(notifyCallback.retryTimes), TimeUnit.MILLISECONDS); }}Copy the code
The specific logic is as follows:
+--------------------+
| PublishDataHandler |
+--------+-----------+
|
|
| publisher
|
v
+---------+------------+
|DataChangeEventCenter |
+---------+------------+
|
|
| ChangeData
v
+---------+------------+
| DataChangeEventQueue |
+---------+------------+
|
|
| ChangeData
v
+-------+----------+
| DataChangeHandler|
+-------+----------+
|
|
| ChangeData
v
+------+--------+ +------------+
| ChangeNotifier| +--------> | datumCache |
+------+--------+ +------------+
|
|
v
+---+------+
| notifier |
+---+------+
|
v
+-----------+---------------+
| |
v v
+----+----------------+ +------+----------+
|SessionServerNotifier| | BackUpNotifier |
+----+----------------+ +------+----------+
| |
| |
| |
| v
+--v------------+ +------+----------------+
| sessionServer | | AbstractAcceptorStore |
+---------------+ +-----------------------+
Copy the code
0 x04 summary
This article is a point in the registration of “event change notification ChangeNotifie” for detailed expansion, with SessionServerNotifier and BackUpNotifier as an example, to explain the principle and use of ChangeNotifier. The functions including dataChangeEventCenter are also sorted out, hoping to be helpful to everyone.
At DataServer, data changes in two directions:
-
Data server node changes;
-
The change of data, namely the change of Publisher and Scriber;
ChangeNotifier is responsible for notifying related modules of Publisher and Scriber changes. Change notifications are a form of decoupling.
0 XFF reference
SOFARegistry of Ant Financial launches
Ant Financial SOFARegistry provides service registration and operation logs