0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This article, in its seventeenth installment, introduces the deferred operations of SOFARegistry.

0x01 Service Domain

1.1 Business Reasons

Why AfterWorkingProcess?

The purpose of AfterWorkingProcess is to delay action. The guess is that in some cases, the business cannot be executed and can only be made up at a later time.

A similar argument on the official blog supports our judgment:

Before data synchronization is complete, all data read operations on the new node are forwarded to the data node that owns the data shard.

Do not write data to the new node before data synchronization is complete to prevent data inconsistency during data synchronization.

1.2 Learning Direction

You can see how something like this business delay operation could be implemented.

0 x02 implementation

2.1 define

The interface is defined as follows:

public interface AfterWorkingProcess {
    void afterWorkingProcess(a);
    int getOrder(a);
}
Copy the code

2.2 configuration

This afterWorkProcessors as AfterWorkingProcessHandler member variables for processing. Used to process some business logic after the end of the processing action.

        @Bean(name = "afterWorkProcessors")
        public List<AfterWorkingProcess> afterWorkingProcessors(a) {
            List<AfterWorkingProcess> list = new ArrayList<>();
            list.add(renewDatumHandler());
            list.add(datumLeaseManager());
            list.add(disconnectEventHandler());
            list.add(notifyDataSyncHandler());
            return list;
        }

        @Bean
        public AfterWorkingProcessHandler afterWorkingProcessHandler(a) {
            return new AfterWorkingProcessHandler();
        }
Copy the code

2.3 the engine

It’s a little less common here. AfterWorkingProcessHandler also AfterWorkingProcess implementation class.

In its afterWorkingProcess function, the afterWorkingProcess business function is called one by one on the implementation classes registered in the middle of Bean afterWorkingProcessors.

GetOrder specifies the execution priority, which is a common routine.

public class AfterWorkingProcessHandler implements AfterWorkingProcess {

    @Resource(name = "afterWorkProcessors")
    private List<AfterWorkingProcess> afterWorkingProcessors;

    @Override
    public void afterWorkingProcess(a) {

        if(afterWorkingProcessors ! =null){
            List<AfterWorkingProcess> list = afterWorkingProcessors.stream().sorted(Comparator.comparing(AfterWorkingProcess::getOrder)).collect(Collectors.toList());

            list.forEach(AfterWorkingProcess::afterWorkingProcess);
        }
    }

    @Override
    public int getOrder(a) {
        return 0; }}Copy the code

2.4 call

Only DataServerCache # updateDataServerStatus is called:

afterWorkingProcessHandler.afterWorkingProcess();
Copy the code

In DataServerCache, the following functions are called to updateDataServerStatus:

  • synced
  • notifiedAll
  • checkAndUpdateStatus
  • addNotWorkingServer

The illustration is as follows:

+------------------------------------------+
| DataServerCache                          |                                 +----------------------------------------------+
|                                          |                                 |   AfterWorkingProcess                        |
| synced +----------------------+          |                                 |                                              |
|                               |          | +----------------------------+  | +------------------------------------------+ |
|                               |          | | AfterWorkingProcessHandler |  | |renewDatumHandler.afterWorkingProcess     | |
|                               |          | |                            |  | |                                          | |
|                               v          | |                            |  | |datumLeaseManager.afterWorkingProcess     | |
| notifiedAll +--->updateDataServerStatus +------> afterWorkingProcess +------>+                                          | |
|                                 ^   ^    | |                            |  | |disconnectEventHandler.afterWorkingProcess| |
|                                 |   |    | +----------------------------+  | |                                          | |
|                                 |   |    |                                 | |notifyDataSyncHandler.afterWorkingProcess | |
| checkAndUpdateStatus+-----------+   |    |                                 | +------------------------------------------+ |
|                                     |    |                                 +----------------------------------------------+
| addNotWorkingServer +---------------+    |
|                                          |
+------------------------------------------+

Copy the code

The mobile phone is as follows:

Because it is a business association, there is no need for timing, asynchrony, etc.

2.5 Service Implementation

2.5.1 DisconnectEventHandler

public class DisconnectEventHandler implements InitializingBean.AfterWorkingProcess {
    /** * a DelayQueue that contains client disconnect events */
    private final DelayQueue<DisconnectEvent>           EVENT_QUEUE        = new DelayQueue<>();

    @Autowired
    private SessionServerConnectionFactory              sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter                       dataChangeEventCenter;

    @Autowired
    private DataServerConfig                            dataServerConfig;

    @Autowired
    private DataNodeStatus                              dataNodeStatus;

    private static final int                            BLOCK_FOR_ALL_SYNC = 5000;

    private static final BlockingQueue<DisconnectEvent> noWorkQueue        = new LinkedBlockingQueue<>();
}
Copy the code

In the normal business operation of receive, if you find that the status of the event itself is not WORKING, you place the event in BlockingQueue.

public void receive(DisconnectEvent event) {
        if (event.getType() == DisconnectTypeEnum.SESSION_SERVER) {
            SessionServerDisconnectEvent sessionServerDisconnectEvent = (SessionServerDisconnectEvent) event;
                sessionServerDisconnectEvent.getProcessId());
        } else if (event.getType() == DisconnectTypeEnum.CLIENT) {
            ClientDisconnectEvent clientDisconnectEvent = (ClientDisconnectEvent) event;
        }

        if(dataNodeStatus.getStatus() ! = LocalServerStatusEnum.WORKING) { noWorkQueue.add(event);return;
        }
        EVENT_QUEUE.add(event);
}
Copy the code

When the time comes, afterWorkingProcess is called again. Here the Block will always be on the noWorkQueue, and if not empty, the request will be executed.

public void afterWorkingProcess(a) {
    try {
        /* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */
        TimeUnit.MILLISECONDS.sleep(BLOCK_FOR_ALL_SYNC);

        while(! noWorkQueue.isEmpty()) { DisconnectEvent event = noWorkQueue.poll(1, TimeUnit.SECONDS);
            if(event ! =null) { receive(event); }}}}Copy the code

The illustration is as follows:

+----------------------------------------------------------+ | DisconnectEventHandler | | +-------------------------+ | | | receive | | | | | NOT WORKING | | | dataNodeStatus.getStatus+---------------+ | | | + | | | | | | WORKING | | add | | | | | | | | | v | | | | | EVENT_QUEUE.add(event) | | | | | | +---v---------+ | | +-------------------------+ | | | | |  noWorkQueue | | | | | | | +-----------------------+ +-----+-------+ | | | afterWorkingProcess | | | | | | | poll | | | | NOT isEmpty | | | | receive(event) <----------------------+ | | | | | | | | | | +-----------------------+ | +----------------------------------------------------------+Copy the code

2.5.2 NotifyDataSyncHandler

DisconnectEventHandler and NotifyDataSyncHandler have similar implementations.

Rely on a LinkedBlockingQueue for the cache queue.

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {
  
  private static final BlockingQueue<SyncDataRequestForWorking> noWorkQueue = new LinkedBlockingQueue<>();
  
}
Copy the code

In doHandle normal business operations, if found itself status is not WORKING, with business logic SyncDataRequestForWorking SyncDataRequestForWorking build a news, Put it in LinkedBlockingQueue.

@Override
public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
        final Connection connection = ((BoltChannel) channel).getConnection();
        if(dataNodeStatus.getStatus() ! = LocalServerStatusEnum.WORKING) { noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
            return CommonResponse.buildSuccessResponse();
        }
        executorRequest(connection, request);
        return CommonResponse.buildSuccessResponse();
}
Copy the code

When the time comes, afterWorkingProcess is called again. Here the Block will always be on the noWorkQueue, and if not empty, the request will be executed.

@Override
public void afterWorkingProcess(a) {
            while(! noWorkQueue.isEmpty()) { SyncDataRequestForWorking event = noWorkQueue.poll(1, TimeUnit.SECONDS);
                if(event ! =null) { executorRequest(event.getConnection(), event.getRequest()); }}}}Copy the code

The illustration is as follows:

+----------------------------------------------------------+ | NotifyDataSyncHandler | | +-------------------------+ | |  | doHandle | | | | | NOT WORKING | | | dataNodeStatus.getStatus+---------------+ | | | + | | | | | | WORKING | | add | | | | | | | | | v | | | | | executorRequest | | | | | | +---v---------+ | | +-------------------------+ | | | | | noWorkQueue | | | | | | | +-----------------------+ +-----+-------+ | | | afterWorkingProcess | | | | | | | poll | | | |  NOT isEmpty | | | | executorRequest <----------------------+ | | | | | | | | | | +-----------------------+ | +----------------------------------------------------------+Copy the code

2.5.3 RenewDatumHandler

RenewDatumHandler and DatumLeaseManager are similar. No queue is used, just a thread is submitted.

The purpose of this is clearly stated in the comments:

/* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */
Copy the code

However, the details are different, the two classes are the same author, I suspect that you are experimenting with two different implementations.

RenewDatumHandler ThreadPoolExecutorDataServer based.

public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements
                                                                               AfterWorkingProcess {

    @Autowired
    private ThreadPoolExecutor  renewDatumProcessorExecutor;

}
Copy the code

RenewDatumProcessorExecutor is a Bean, specific as follows, ArrayBlockingQueue: is a bounded blocking queue based on the structure of the array, based on the principles of FIFO.

@Bean(name = "renewDatumProcessorExecutor")
public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServerConfig) {
            return new ThreadPoolExecutorDataServer("RenewDatumProcessorExecutor",
                dataServerConfig.getRenewDatumExecutorMinPoolSize(),
                dataServerConfig.getRenewDatumExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(dataServerConfig.getRenewDatumExecutorQueueSize()),
                new NamedThreadFactory("DataServer-RenewDatumProcessor-executor".true));
}
Copy the code

ThreadPoolExecutorDataServer main code is as follows, it is a simple inherited ThreadPoolExecutor, subsequent estimate here there will be new features to add, now only account for the pit:

public class ThreadPoolExecutorDataServer extends ThreadPoolExecutor {
    @Override
    public void execute(Runnable command) {
		super.execute(command); }}Copy the code

For afterWorkingProcess, a thread is submitted that waits for a period of time and then sets renewEnabled.

@Override
public void afterWorkingProcess(a) {
        renewDatumProcessorExecutor.submit(() -> {
            TimeUnit.MILLISECONDS.sleep(dataServerConfig.getRenewEnableDelaySec());
            renewEnabled.set(true);
        });
}
Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

How does ant Financial Service registry realize the smooth scaling of DataServer

Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization

The service registry Session storage policy | SOFARegistry parsing

Introduction to the Registry – SOFARegistry architecture for Massive Data

Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing

Ant Financial open source communication framework SOFABolt analysis of connection management analysis

Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework

SOFABolt protocol framework analysis of Ant Financial open source communication framework

Ant gold uniform service registry data consistency analysis | SOFARegistry parsing

Ant communication framework practice

Sofa – Bolt remote call

Sofa – bolt to study

SOFABolt Design Summary – Elegant and simple design approach

SofaBolt source code analysis – Service startup to message processing

SOFABolt source code analysis

SOFABolt source code analysis 9-userProcessor custom processor design

SOFARegistry introduction

SOFABolt source code analysis of the design of the 13-Connection event processing mechanism