0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This article, part five, introduces asynchronous processing of SOFARegistry message Bus.

0x01 Why Separation

Earlier we talked about SOFARegistry’s message bus, but in this article we’ll talk about a variation of DataChangeEventCenter.

The DataChangeEventCenter is separate and handles messages related to data changes.

Why separation? Because:

  • Architecturally, DataChangeEventCenter is dedicated to handling data change messages, which are decouple;
  • Technically, DataChangeEventCenter has different implementation techniques than EventCenter, so it needs to be handled separately.
  • However, the deeper reason is that business scenarios are different. As can be seen from the following analysis, DataChangeEventCenter and business are closely coupled.

0x02 Business Domain

2.1 Application Scenarios

DataChangeEventCenter’s unique business scenarios are as follows:

  • Merge functionality needs to be provided. That is, there will be multiple notifications in a short period of time, do not need to process one by one, only the last one can be processed;
  • Asynchronously processing messages;
  • Message order needs to be guaranteed;
  • Delayed operation;
  • Need to improve processing capacity, parallel processing;

As a result, the DataChangeEventCenter code is so tightly tied to the business that the EventCenter mentioned above is no longer appropriate.

2.2 Delay and merge

We’ll talk about delay and merge separately.

2.2.1 Service Features

One of the characteristics of Ant Financial’s business is that it can detect service outages at the second level through the connection-sensitive feature.

This is why SOFARegistry designs health checks that “bind service data to the service publisher’s physical connection and clear the data immediately after disconnection”, a feature called connection sensitivity for short. Connection sensitivity means that in SOFARegistry all clients maintain a long connection to the SessionServer. Each long connection is set to a SOFABolt heartbeat. If the long connection breaks, the Client immediately initiates a new connection. Maintain a reliable connection between Client and SessionServer at all times.

2.2.2 problem

However, one problem is that there may be a large number of reconnection operations in a short period of time due to network problems. For example, if the connection is disconnected due to network problems and the actual service process does not break down, the client immediately reconnects to the SessionServer and registers all service data again.

However, if the process is short enough (such as disconnection and reconnection within 500ms), service subscribers should not feel the service is offline. SOFARegistry should handle this internally.

2.2.3 solve

SOFARegistry does merge and delay operations internally to ensure that users are not affected. For example, data inside DataServer is delayed to merge changed Publisher service information through mergeDatum, and Version is the latest version number after the merge.

In the case of DataChangeEventCenter, this is facilitated by message deferral and merging.

2.3 Implementation of Ant Financial

The following is a description of the overall functionality of DataChangeEventCenter:

  • When a publisher goes up or down, the publishDataProcessor or unPublishDataHandler is triggered, respectively.
  • Handler first determines the state of the current node:
    • If it is not working, return the request failed.
    • In a working state, the Handler adds a data change event to the dataChangeEventCenter, which triggers the onChange method in the dataChangeEventCenter. Used to asynchronously notify the event Change center of changes to data;
  • After receiving the event, the event Change center adds the event to the queue. The dataChangeEventCenter asynchronously processes the online and offline data according to different event types.
  • At the same time, the DataChangeHandler publishes the event change information through the ChangeNotifier to inform other nodes to synchronize data.

0x03 DataChangeEventCenter

3.1 an overview

DataChangeEventCenter is divided into four parts:

  • Event Center: Organized as a message Center.
  • Event Queue: Used for multi-channel processing to increase the processing capability.
  • Event Task: Starts a thread in each Queue for asynchronous processing to increase the processing capability.
  • Event Handler: Used to process internal ChangeData;

Next, we will introduce them one by one. Because DataChangeEventCenter is closely integrated with business, we will give in-depth explanations based on business.

3.2 DataChangeEventCenter

3.2.1 definition

The DataChangeEventCenter maintains an array of DataChangeEventQueue queues, which is the core. Each element in the array is an event queue. Specific definitions are as follows:

public class DataChangeEventCenter {

    /** * count of DataChangeEventQueue */
    private int                    queueCount;

    /** * queues of DataChangeEvent */
    private DataChangeEventQueue[] dataChangeEventQueues;

    @Autowired
    private DataServerConfig       dataServerConfig;

    @Autowired
    private DatumCache             datumCache;
}
Copy the code

3.2.2 Message Types

DataChangeEventCenter specializes in handling messages of IDataChangeEvent type, which can be implemented in three ways:

  • public class ClientChangeEvent implements IDataChangeEvent
  • public class DataChangeEvent implements IDataChangeEvent
  • public class DatumSnapshotEvent implements IDataChangeEvent

These different types of messages can be placed in the same Queue, depending on certain discriminations, such as hashing the Publisher’s DataInfoId to determine which Queue to place in.

That is, when the onChange method of the corresponding handler is triggered, the Hash value of the dataInfoId of the changed service will be calculated, so as to further determine the number of the queue where the service registration data resides, and then encapsulate the changed data into a data change object and transmit it to the queue.

3.2.3 initialization

In the initialization function, the EventQueue is built, and each Queue starts a thread to process the message.

@PostConstruct
public void init(a) {
    if (isInited.compareAndSet(false.true)) {
        queueCount = dataServerConfig.getQueueCount();
        dataChangeEventQueues = new DataChangeEventQueue[queueCount];
        for (int idx = 0; idx < queueCount; idx++) {
            dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache); dataChangeEventQueues[idx].start(); }}}Copy the code

3.2.4 Put message

The put message is relatively simple, and how to determine which Queue to put events in depends on the method, such as hashing the DataInfoId of Publisher to determine which Queue to put events in:

int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));
Copy the code

3.2.5 How Do I Process Messages

Concrete is by dataChangeEventQueues onChange for processing, such as the following function, dealing with different message types. Find the queue and call:

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if(publisher.getPublishType() ! = PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(newDataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); }}public void onChange(ClientChangeEvent event) {
    for(DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); }}public void onChange(DatumSnapshotEvent event) {
    for(DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); }}public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}
Copy the code

3.3 DataChangeEvent

Since the DataChangeEvent is the most commonly used, we’ll separate it out.

DataChangeEvent is distinguished by DataChangeTypeEnum and DataSourceTypeEnum, that is, the type of processing and the source of the message.

DataChangeTypeEnum Is classified into:

  • MERGE, if the change type is MERGE, updates the new Datum to be updated in the cache and updates the version number;
  • COVER, if the change type is COVER, the original cache is overwritten;

DataSourceTypeEnum can be divided into:

  • PUB: PUB by client;
  • PUB_TEMP: pub temporary data;
  • SYNC: SYNC from dataservers in other datacenter;
  • BACKUP: from dataservers in the same datacenter;
  • CLEAN: local dataInfo check,not belong this node schedule remove;
  • SNAPSHOT: SNAPSHOT data, after renewing finds Data Inconsistent;

Specific definitions are as follows:

public class DataChangeEvent implements IDataChangeEvent {

    /** * type of changed data, MERGE or COVER */
    private DataChangeTypeEnum changeType;

    private DataSourceTypeEnum sourceType;

    /** * data changed */
    private Datum              datum;
}
Copy the code

3.4 DataChangeEventQueue

DataChangeEventQueue is the core of this submodule and is used for multi-channel processing to increase processing power. Starting a thread inside each Queue for asynchronous processing also increases processing power.

3.4.1 Core variables

The core here is:

  • BlockingQueue eventQueue;

  • Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

  • DelayQueue CHANGE_QUEUE = new DelayQueue();

The explanation is as follows:

  • As you can see, the data type is ChangeData. Converting Datum to ChangeData can unify the message processing method or source;
  • EventQueue is used to store incoming messages. All blocks are placed on the queue to ensure that the messages are processed in sequence.
  • CHANGE_DATA_MAP_FOR_MERGE. As the name implies, it deals primarily with message merging. Use the putIfAbsent method to add a key/value pair to the Map. If no key/value pair is present in the Map set, add the key/value pair and return null. If a corresponding value already exists, the value remains the same. If you add multiple messages to the map in a short period of time, you merge the extra messages.
  • CHANGE_QUEUE is used to uniformly process the ChangeData that is put into the queue. No matter which data center it is, it will process the data uniformly here. Note that DelayQueue is used for deferred operations, which we mentioned earlier in the business;

Specific definitions are as follows:

public class DataChangeEventQueue {

    private final String                               name;

    /** * a block queue that stores all data change events */
    private final BlockingQueue<IDataChangeEvent>      eventQueue;

    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

    private final DelayQueue<ChangeData>               CHANGE_QUEUE              = new DelayQueue();

    private final int                                  notifyIntervalMs;

    private final int                                  notifyTempDataIntervalMs;

    private final ReentrantLock                        lock                      = new ReentrantLock();

    private final int                                  queueIdx;

    private DataServerConfig                           dataServerConfig;

    private DataChangeEventCenter                      dataChangeEventCenter;

    private DatumCache                                 datumCache;
}
Copy the code

3.4.2 Startup and engine

The DataChangeEventQueue#start method is called by a new thread when the DataChangeEventCenter is initialized, which continuously fetches new events from the queue and distributes them. The new data will be added to the node and sharded. Because eventQueue is a BlockingQueue, you can use while (true) to control it.

When the event was removed, according to DataChangeScopeEnum. The DATUM is different, do different processing.

  • If is DataChangeScopeEnum DATUM, the judge dataChangeEvent. GetSourceType ();
    • If DataSourceTypeEnum.PUB_TEMP, addTempChangeData adds ChangeData to CHANGE_QUEUE.
    • If not, handleDatum;
  • If is DataChangeScopeEnum. The CLIENT is handleClientOff ((ClientChangeEvent) event);
  • If it is DataChangeScopeEnum. The SNAPSHOT, handleSnapshot ((DatumSnapshotEvent) event);

The specific code is as follows:

public void start(a) {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {
        while (true) {
            try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();
                if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
                    //Temporary push data will be notify as soon as,and not merge to normal pub data;
                    if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else{ handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum()); }}else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if(scope == DataChangeScopeEnum.SNAPSHOT) { handleSnapshot((DatumSnapshotEvent) event); }}}}); }Copy the code

The details are as follows:

+----------------------------+ | DataChangeEventCenter | | | | +-----------------------+ | | | DataChangeEventQueue[]| |  | +-----------------------+ | +----------------------------+ | | v +------------------+------------------------+ | DataChangeEventQueue | | | | +---------------------------------------+ | | | | | | | BlockingQueue<IDataChangeEvent> +-------------+ | | | | | | | | | +-v---------+ | | Map<String, Map<String, ChangeData<> | | <--> | | | | | | | Executor | | | | | | | | | start +------------------------------> | | | | | | +-+---------+ | | | | | | | DelayQueue<ChangeData> <-------------------+ | | | | | +---------------------------------------+ | +-------------------------------------------+Copy the code

Rule 3.4.3 ChangeData

HandleDatum converts Datum to ChangeData for processing,

Why convert to ChangeData for storage.

There are different types of message processing and source. For example, in notifyFetchDatumHandler. fetchDatum, it fetches the Datum from other data servers, and then puts a message to the dataChangeEventCenter based on the Datum. Notify the Data Server to perform the BACKUP operation. The type is COVER.

Converting to ChangeData allows for unified processing of messages or sources.

The user will store a message containing the datum.

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);
Copy the code

The DataChangeEventQueue gets the Datum from the DataChangeEvent, converts the Datum to ChangeData, and stores it.

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {
            //get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType);  Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) { changeData.setDatum(targetDatum); }}Copy the code

ChangeData is defined as follows:

public class ChangeData implements Delayed {

    /** data changed */
    private Datum              datum;

    /** change time */
    private Long               gmtCreate;

    /** timeout */
    private long               timeout;

    private DataSourceTypeEnum sourceType;

    private DataChangeTypeEnum changeType;
}
Copy the code

3.4.4 processing Datum

3.4.4.1 join Datum

Here we are dealing with the real ChangeData cache, as well as the newly added Datum.

  • Merge first obtains previously stored ChangeData from CHANGE_DATA_MAP_FOR_MERGE. If not, an join is generated in preparation for possible merge.
  • Once you get ChangeData
    • If the change type is COVER, the original cache is overwritten. changeData.setDatum(targetDatum);
    • Otherwise, MERGE updates the new Datum to be updated in the cache and updates the version number.

Details are as follows:

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {
    lock.lock();
    try {
        //get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType);  Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
            Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();
            for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);
                if(cachePub ! =null) {
                    // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
                    // that pub is not the newest data, should be ignored
                    if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
                        continue;
                    }
                    // if pub and cachePub both are publisher, and sourceAddress of both are equal,
                    // and version of cachePub is greater than version of pub, should be ignored
                    if(! (pubinstanceofUnPublisher) && ! (cachePubinstanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {
                        continue; } } cachePubMap.put(registerId, pub); cacheDatum.setVersion(targetDatum.getVersion()); }}}finally{ lock.unlock(); }}Copy the code
3.4.4.2 proposed Datum

When extracting, use the take function to extract ChangeData from CHANGE_QUEUE and CHANGE_DATA_MAP_FOR_MERGE.

public ChangeData take(a) throws InterruptedException {
    ChangeData changeData = CHANGE_QUEUE.take();
    lock.lock();
    try {
        removeMapForMerge(changeData);
        return changeData;
    } finally{ lock.unlock(); }}Copy the code

The actual extraction of the Datum is done in the DataChangeHandler.

3.5 DataChangeHandler

DataChangeHandler periodically extracts messages from the DataChangeEventCenter and processes them. The main function is to implement a ChangeNotifier to notify relevant modules: Hi, new data changes have arrived.

3.5.1 track of the class definition

public class DataChangeHandler {

    @Autowired
    private DataServerConfig          dataServerConfig;

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;
}
Copy the code

3.5.2 ChangeNotifier Execution Engine

The DataChangeHandler will iterate through all the DatachAngeEventQueues in the DataChangeEventCenter and fetch ChangeData from the DataChangeEventQueue, For each ChangeData, a ChangeNotifier is generated.

Each ChangeNotifier is a processing thread.

Each dataChangeEventQueue generates five Changenotifiers.

@PostConstruct
public void start(a) {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5.this.getClass().getSimpleName());
  
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(newChangeNotifier(changeData, name)); }}); }}Copy the code

3.5.3 Notify

Let’s review the business:

When a publisher goes online, the publishDataProcessor or unPublishDataHandler is triggered, which adds a data change event to the dataChangeEventCenter. Used to asynchronously notify the event Change center of changes to data. After receiving the event, the event Change center adds the event to the queue. The dataChangeEventCenter asynchronously processes the online and offline data according to different event types.

For ChangeData, a ChangeNotifier is generated for processing. The event change information is published through the ChangeNotifier to inform other nodes to synchronize data.

private class ChangeNotifier implements Runnable {

    private ChangeData changeData;
    private String     name;

    @Override
    public void run(a) {
        if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();

            if(changeType == DataChangeTypeEnum.MERGE && sourceType ! = DataSourceTypeEnum.BACKUP && sourceType ! = DataSourceTypeEnum.SYNC) {//update version for pub or unPub merge to cache
                //if the version product before merge to cache,it may be cause small version override big one
                datum.updateVersion();
            }

            long version = datum.getVersion();

            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if(datumCache.cleanDatum(dataCenter, dataInfoId)) { ...... }}else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();

                    if(lastVersion ! =null
                        && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                        return;
                    }

                    //lastVersion null means first add datum
                    if (lastVersion == null|| version ! = lastVersion) {if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }

    }
}
Copy the code

The notify function iterates through the dataChangeNotifiers

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if(notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); }}}Copy the code

The corresponding Bean is:

@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers(a) {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}
Copy the code

As for how to handle notifications, we will write about them later.

So far, the overall logic of DataChangeEventCenter is shown in the following figure

+----------------------------+ | DataChangeEventCenter | | | | +-----------------------+ | | | DataChangeEventQueue[]| |  | +-----------------------+ | +----------------------------+ | | v +------------------+------------------------+ | DataChangeEventQueue | | | | +---------------------------------------+ | | | | | | | BlockingQueue<IDataChangeEvent> +-------------+ | | | | | | | | | +-v---------+ | | Map<String, Map<String, ChangeData<> | | <--> | | | | | | | Executor | | | | | | | | | start +------------------------------> | | | | | | +-+---------+ | | | | | +----------------+ DelayQueue<ChangeData> <-------------------+ | | | | | | | +---------------------------------------+ | | +-------------------------------------------+ | | | +--------------------------+ | take | | notify +-------------------+ +-------> | DataChangeHandler | +---------> |dataChangeNotifiers| | | +-------------------+ +--------------------------+Copy the code

The phone is shown below:

0 x04 conclusion

Because of the unique business scenario, Ali separated DataChangeEventCenter to meet the following business requirements. If you have similar needs in actual work, you can refer to them for reference. The specific treatment methods are as follows:

  • Need to improve processing capacity, parallel processing;
    • Queue array implementation, each queue can process messages, increase processing capacity;
  • Asynchronously processing messages;
    • Start a thread inside each Queue for asynchronous processing;
  • Message order needs to be guaranteed;
    • EventQueue is used to store incoming messages. All blocks are placed on the queue to ensure that the messages are processed in sequence.
  • Delayed operation;
    • DelayQueue is used for deferred operations;
  • Need merge operation, that is, there will be multiple notifications in a short period of time, do not need to process one by one, only the last one can be processed;
    • Use the putIfAbsent method to add a key-value pair. If the key does not exist in the map set, add the key and return null. If the key already exists, the value is the same as the original value. If you add multiple messages to the map in a short period of time, you merge the extra messages.

0 XFF reference

EventBus analysis in Guava