Introduction:

We are all familiar with scheduled Task. From the Crontab to Spring Task, from QuartZ to XXL-Job, with more and more diverse and complex business scenarios, the framework of scheduled Task is constantly upgrading and evolving.

Today, I would like to talk about distributed task scheduling from the following three aspects: the evolution process from single-machine scheduled task to distributed task scheduling platform, how Tencent Cloud distributed task scheduling platform TCT came into being, specific cases of TCT implementation and what core problems it solved.

The authors introduce

Cui Kai

Tencent cloud CSIG micro service center product architect Distributed, high concurrency e-commerce system development, system architecture design experience, good at ground and implementation of the main micro service architecture technology platform, the current focus to the study of micro service architecture related middleware precipitation in the promotion and best practices, to help enterprises to complete the digital transformation.

The scene type

There are thousands of scenarios for timed tasks, but what is the essential model and how to understand timed tasks? Here is a better example. A timed task is when a teacher gives a student homework, for example:

Every evening at 7 o 'clock on time to start writing homework, finished for parents to check and sign.Copy the code

“Every night at 7 o ‘clock on time” is a requirement for time accuracy and periodicity. It is 7 o ‘clock not 8 o ‘clock, and it is every day not every week. “Write homework” is clear to the task execution content, is to write homework is not to see altman; “Have your parent check your signature when you’re done” decoupled “homework” and “parent check your signature” into two logical actions, allowing parents to read a book while their child is doing homework.

Back to the point, the typical landing scenario of scheduled tasks is very common in various industries: in e-commerce, the entrance of promotion activities is opened regularly, the receipt of goods is automatically confirmed if the goods have not been confirmed for 15 days, and the unpaid orders are scanned at certain points for SMS reminder. In the financial insurance industry, there are also scenarios such as marketing commission calculation, terminal marketing report making, timing synchronization of organizational relations, daily and monthly clearing and settlement. In conclusion, the author divides the scenario types of scheduled tasks according to the three dimensions of time-driven, batch processing and asynchronous decoupling.

Time driven type

Take the timed opening of the event entrance in the e-mall scene as an example. In general, various parameters required by the event will be configured in the background, and the dynamic configuration of the event state will be set to close. When the execution time reaches, the timed task will automatically trigger and the promotion will be started.

It can be seen that in time-driven scenarios, businesses pay more attention to time factors such as whether tasks are executed regularly or periodically, the accuracy of specific execution time points, and the length of cycle frequency, compared with execution content.

Batch processing type

Batch processing tasks are characterized by the need to process a large number of accumulated business objects simultaneously. At this point, some friends might ask, why not use message queuing? The reason is that message queues cannot be easily replaced in certain scenarios because message queues are more event-driven on a per-message basis, with a preference for more real-time processing.

Use insurance commissions to settle business statements, such as marketing commissions. The marketing personnel will get a certain proportion of the premium paid by the applicant, and this proportion will be different according to the insurance period, insurance types, and may also be superimposed on the company’s commission incentive policies. Such scenarios need to accumulate a certain amount of data and perform batch calculations on a regular basis, rather than every event.

Asynchronous decoupling

Decoupling of the asynchronous said must also often think of the message queue, but the message queue does not apply some external system data acquisition, such as stock software companies in the securities industry for fetching exchange stock prices, because the stock price for the stock software company is external data, using the message queue is difficult for asynchronous communication between internal and external system. Therefore, in general, data is captured and stored regularly through batch processing tasks, and then the back-end system analyzes and collates the data, which decouples the logic of external data acquisition and internal data analysis and processing.

All men are mortal

Single machine scheduled Task

Single-machine scheduled tasks are the most common and traditional task execution methods, such as the built-in Crontab in Linux. It realizes the execution of single-machine scheduled tasks through the time dimension of minute, hour, day, month and Friday in CRON expression.

21 * * * /etc/init.d/ SMB restartCopy the code

In addition, also has a built-in Timer in the Java tasks, such as Java. Util. The Timer class and its upgrade ScheduledThreadPoolExecutor, In addition, Spring Task is also provided in the Spring architecture, which can quickly implement crON expression support single-machine scheduling Task framework through annotations.

@EnableScheduling @Service public class ScheduledConsumerDemo { @Value("${consumer.request.echoUrl}") private String echoUrl; /** * Request the Provider's echo interface ** @throws InterruptedException */ @scheduled (fixedDelayString = "${consumer.auto.test.interval:1000}") public void callProviderPer1Sec() throws InterruptedException { String response =  restTemplate.getForObject(echoUrl, String.class); }}Copy the code

Obviously, stand-alone scheduling task is very convenient to deal with simple business scenarios. However, as distributed architecture has become a trend, stand-alone scheduling task cannot meet the demands of enterprise production and industrial scenarios. This is mainly reflected in the unified management of cluster task configuration, single point of failure and single point of performance, task communication and coordination among nodes, task execution data summary, etc. In order to meet the demands of enterprise-level production, various task scheduling platforms have emerged gradually.

Centralized scheduling

Quartz, a typical centralized scheduling framework, is the predecessor and leader in the field of task scheduling. It once became a synonym for task scheduling through its excellent scheduling ability, rich API interface, good integration of Spring and other advantages.

The Quartz architecture uses database locks to ensure the uniqueness of multi-node tasks and solve the problem of single point of failure. However, the centralization of database locks also causes serious performance problems. For example, in the scenario of a large number of tasks, the database becomes the performance bottleneck of the overall service scheduling, and some resources are idle on the application side. In addition, parallel task sharding cannot be performed.

Xxl-job, another framework from Dianping, is characterized by simplicity, easy integration and visual console. Compared with Quartz, the main differences are as follows:

  • Self-developed scheduling module:

Xxl-job decouples the scheduling module from the task module asynchronously, which solves the problem that the performance of the scheduling system is greatly reduced when the scheduling task logic is too heavy. Among them, the scheduling module is mainly responsible for task parameter parsing and call initiation, while the task module is responsible for the execution of task content. At the same time, the optimization of asynchronous scheduling queue and asynchronous execution queue enables limited thread resources to support a certain amount of job concurrency.

  • Scheduling optimization:

By scheduling thread pool and parallel scheduling, the probability of scheduling block is greatly reduced and the load of scheduling system is increased.

  • High availability guarantee:

The database of the scheduling center stores task information, scheduling history, scheduling logs, and node registration information to ensure data persistence and high availability through MySQL. Failover mode and heartbeat detection of task nodes are also dynamically aware of the status of each execution node.

But because XXL-Job uses a database locking mechanism similar to Quartz, it’s not immune to performance bottlenecks and other problems with centralization.

Decentralized scheduling

In order to solve the problems of centralized scheduling, domestic open source frameworks such as PowerJob, Dangdang’s Elastice-Job, and Vipshop’s Saturn have been successful. Saturn as a whole is improved and optimized based on the open source Elastic-Job, so this article will give a brief introduction to PowerJob and Elastic-Job.

Powerjob was born in April 2020 and includes some relatively new ideas and elements, such as support for MapReduce-based distributed computing and dynamic hot loading of Spring containers. In terms of functions, multi-task workflow arrangement, MapReduce execution mode and delayed execution are highlights. Meanwhile, it claims that all components support horizontal expansion, and its core components are described as follows:

  • Powerjob-server: a centralized scheduling center that schedules and manages tasks.
  • Powerjob-worker: an executor that provides single-machine execution, broadcast execution, and distributed computing.
  • Powerjob-client: Optional OpenAPI client.

The design idea of powerJob in solving lock-free scheduling is worthy of reference. The core logic is to bind PowerJob-server and Powerjob-worker logically by grouping key through appName as the key of business application group. Ensure that each Powerjob-worker cluster is connected to only one PowerJob-server when it is running. This eliminates the need for locking mechanisms to prevent tasks from being picked up by multiple servers at the same time, resulting in repetitive execution.

Although PowerJob is relatively excellent in all aspects of analysis, after all, the product iteration cycle is relatively short, and it still needs to continuously polish product details through large-scale market applications to verify product performance, ease of use and stability.

Elasticjob includes elasticJob-Lite and ElasticJob-Cloud. This section uses ElasticJob-Lite as an example.

Elasticjob-lite is positioned as a lightweight, decentralised solution that inherits Quartz and uses ZooKeeper as its registry. In terms of product design, I understand that ElasticJob lays more emphasis on data processing and calculation than other distributed task scheduling frameworks, which is mainly reflected in the following two aspects:

Decentralization of ElasticJob-Lite:

  • After the jar package of ElasticJob is imported into the service program, the jar package is used to schedule tasks, communicate status, and drop logs.

  • Each task node is peer. Zookeeper registers task-related information (such as task name, peer instance list, and execution policy) in zooKeeper and elects execution instances based on the election mechanism of ZooKeeper.

Elasticjob-lite elastic sharding:

  • Based on ZooKeeper, task execution instances can sense each other’s online and offline status in near real time, so that task fragment allocation can be adjusted as the number of task instances is adjusted, and the load is relatively uniform.

  • When a task instance is online or offline, the current task is not affected and the task is re-fragmented during the next task scheduling to avoid repeated task execution.

Elasticjob is designed for distributed task computing scenarios. It is more suitable for fragmented computation or processing of large amounts of data, especially in scenarios requiring resource utilization.

The evolution process

After a rough introduction to the major distributed task scheduling frameworks, a question arises: what are the main factors that drive the evolution of the frameworks step by step? The author briefly summarizes the following four factors:

  • Business complexity: The original business complexity is low, 2 or 3 lines of code can be done; With the increase of business complexity, the organizational form and execution content of tasks have changed greatly, gradually deriving demands such as task arrangement, framework ecological integration, multi-language and multi-terminal support.

  • Scenario diversity: Not only simple scheduled task execution, but also problems in scenarios such as batch computing and service decoupling are gradually solved by using distributed task scheduling framework. The requirements for framework capabilities lie in richer task execution strategies, support for dynamic sharding calculation, and rich task governance capabilities.

  • Distributed architecture: The arrival of the distributed architecture trend is the most important driving factor. The overall design of the framework must be based on the premise of distributed architecture. Communication between task nodes and scheduling center, high availability of scheduling platform, fault handling and recovery of task nodes, visual operation and maintenance of task scheduling are all new challenges.

  • Massive data concurrency: When massive business data and concurrent calls become normal, the distributed task scheduling platform needs to promote optimization in the performance of the executor, precision of execution time, parallel and asynchronous processing of tasks, elastic management and control of node resources and other aspects to help improve the overall throughput of the platform.

The evolution of distributed task scheduling framework is a branch of business system evolution from monomer architecture to distributed architecture. The continuous improvement of the capability of distributed task scheduling platform is inseparable from the evolution of microservitization of business architecture.

Similarly, the business systems of all industries are gradually migrating to the cloud, and the trend of enterprise digital transformation is obvious. The evolution process of distributed task scheduling platform in the future is also inseparable from the cloud native industry environment. The overall architecture of the platform needs to be deeply integrated with the cloud native system, so as to meet the changing demands of various industries in the future.

TCT on the Cloud

Tencent Cloud Task is a lightweight and highly reliable distributed Task scheduling platform independently developed by Tencent Cloud. By specifying time rules, scheduling tasks are triggered strictly to ensure reliable and orderly execution of scheduling tasks. This service supports international time expression, execution lifecycle management, and solves the problems of single point of failure and low visualization of traditional scheduled tasks. At the same time, it supports complex scheduling task processing capabilities such as task fragmentation and workflow orchestration, covering a wide range of task scheduling application scenarios, such as data backup, log segmentation, operation and maintenance monitoring, financial daily cutting, etc.

Function is introduced

TCT is mainly divided into three parts in terms of functions: scheduling management platform, task scheduling service and development integration (SDK). The scheduling management platform provides elegant visual interface interaction, and the task scheduling service realizes the task scheduling in distributed scenarios. The open source framework for the development of integration and deep integration is described as follows.

Rich task configuration

  • Multiple execution modes: Random node, broadcast, and fragment execution modes are supported to meet different application scenarios.
  • Multiple triggering policies: Supports periodic triggering, periodic triggering, workflow triggering, and manual triggering.
  • Perfect fault tolerance mechanism: Supports fault tolerance protection mechanisms for various tasks, such as abnormal retry, timeout interruption, and manual stop.

Visual task management

  • Task management view: displays the execution status of tasks, and provides functions such as adding, editing, and deleting tasks, manually executing tasks, and starting or disabling tasks.

  • Execution record view: displays the detailed list of execution batches of all routine tasks and workflow tasks. You can query and filter by task and deployment group.

  • Execution list view: Displays the detailed list of execution batches of a selected task. You can stop or re-execute a task batch.

  • Execution details view: displays a list of execution instances of a task batch. You can stop, re-execute, and query logs of execution instances.

  • Workflow management view: displays the execution status of workflow tasks, provides the operation capabilities of creating workflow tasks, visualizing process choreography, and starting/stopping workflow tasks.

Perfect task operation monitoring alarms

  • Three-dimensional monitoring: Provides three-dimensional monitoring of task running status, task batch status, and execution instance running status, and supports online log viewing for execution instances.

  • Flexible alarm policy: Integrates the cloud monitoring capability to provide alarm capabilities for task execution batch, task execution instance exception, workflow task execution batch, task execution batch, and instance exception, and supports flexible indicator alarm and event alarm configurations.

Architecture principles

TCT components are described as follows:

  • Trigger: Parsing the trigger rules of a task;
  • Scheduler: distributing tasks to be executed, managing task status, etc.
  • Monitoring: Reports monitoring data related to task execution.
  • Console: Console interface for the administrator.
  • Access layer: channel manager for task delivery and status reporting.
  • Access gateway: the gateway that connects to the access layer and SDK.
  • SDK: Runs with the business process and is responsible for executing a specific piece of code logic defined in the task.

First, the trigger parses the task information configured by the user on the console and stored in DB, and puts the parsed execution information into MQ. Secondly, the execution information is consumed by the scheduler and delivered to the specific actuator node through the access layer (the access layer has specific node registration information, including IP address, etc.). Finally, when the node where the SDK is located completes the task execution (success, failure, no response, etc.), it will send the execution result back to the scheduler through TCP long connection, and then the scheduler will interact with DB to complete the change of task status and report the task execution to the monitoring module.

Through the function introduction, it can be found that TCT basically covers the functions required in common task scheduling scenarios, especially in the visual view, which has done a lot of work. Meanwhile, relying on the complete infrastructure construction of Tencent Cloud, TCT also provides a great guarantee in terms of high availability and reducing operation and maintenance costs. In addition, TCT originates from TSF technology platform and is naturally integrated with TSF applications. Supporting components can easily obtain relevant information of TSF applications, such as TSF deployment group ID, node IP, application ID, etc., thus achieving higher task execution efficiency.

However, it is found from the overall architecture diagram that TCT adopts a centralized scheduling scheme. The scheduler, trigger and console components are stateless and support horizontal expansion. The components and SDK communicate through TCP long connection. However, data flow depends on DB and MQ. In a large-scale landing scenario with a large number of tasks and high execution frequency, the throughput of DB and MQ will become a performance bottleneck, and there will be an obvious upper limit even if it can be optimized. Therefore, according to the current product form of TCT, it is more suitable for lightweight task scheduling scenarios.

Sharding execution cases

Summary of background

The fragmented execution mode is often used in the scenario of mass data processing. This case takes the insurance industry as an example to illustrate the business scenario where the subsidiary collects the marketing data of the day to the parent company.

As you can see from the diagram above, the summaryData service invokes the marketing data query API provided by 34 subsidiaries at 2:00 a.m. each day. Shard execution is used because the aggregation of marketing data needs to be triggered at the same time, and the shorter the overall aggregation time, the more accurate it is. In addition, the amount of marketing data varies from company to company, and even the same subsidiaries generate different amounts of marketing data on a daily basis.

The configuration steps

Based on the above service background and existing resources, the overall configuration roadmap is as follows:

  • Create a SummaryData deployment group with four instances of a single thread pool of three.
  • In the application code, 34 subsidiaries are mapped to company ids from 1 to 34.
  • 34 companies were divided into four regions, NORTH, SOUTH, EAST and WEST, according to the general region and daily data volume.
  • The number of fragments is 4, and each fragment corresponds to one instance, that is, one instance calculates the data of at least one region.
  • The list of subsidiary ids corresponding to each region key can be semi-automatically adjusted through code configuration to prevent the sudden increase of data volume of a subsidiary.
  • To prevent repeated statistics, manual compensation is used instead of automatic retry.

Step 1: Trigger class code writing and packaging

public class SimpleShardExecutableTask implements ExecutableTask {

    private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override
    public ProcessResult execute(ExecutableTaskData executableTaskData) {
        // Output task execution metadata
        TaskExecuteMeta executeMeta = executableTaskData.getTaskMeta();
        LOG.info("executeMetaJson:{}",executeMeta.toString());
        // Prints the shard parameters assigned to this instance
        ShardingArgs shardingArgs = executableTaskData.getShardingArgs();
        LOG.info("ShardCount: {}", shardingArgs.getShardCount());
        Integer shardingKey = shardingArgs.getShardKey();
        LOG.info("shardingKey: {}", shardingKey);
        String shardingValue = shardingArgs.getShardValue();
        LOG.info("shardingValue: {}", shardingValue);
        // Simulate task execution
        try {
            this.doProcess(shardingValue);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return ProcessResult.newSuccessResult();
    }

    public void doProcess(String shardingValue) throws Exception {
        if (shardingValue.equals(CompanyMap.NORTH.area)){
            Arrays.stream(CompanyMap.NORTH.companyIds)
                    .forEach(companyId->LOG.info("calling north subsidiary_{} api.....",companyId));
        } else if(shardingValue.equals(CompanyMap.SOUTH.area)){
            Arrays.stream(CompanyMap.SOUTH.companyIds)
                    .forEach(companyId->LOG.info("calling south subsidiary_{} api.....",companyId));
        } else if(shardingValue.equals(CompanyMap.EAST.area)){
            Arrays.stream(CompanyMap.EAST.companyIds)
                    .forEach(companyId->LOG.info("calling east subsidiary_{} api.....",companyId));
        } else if(shardingValue.equals(CompanyMap.WEST.area)){
            Arrays.stream(CompanyMap.WEST.companyIds)
                    .forEach(companyId->LOG.info("calling west subsidiary_{} api.....",companyId));
        }  else {
            throw new Exception("input shardingValue error!");
        }
        ThreadUtils.waitMs(3000L);
    }

    enum CompanyMap{
        NORTH("NORTH".new int[] {1.2.3.4.5.6.7.8.9}),
        SOUTH("SOUTH".new int[] {10.11.12.13.14.15.16.17.18.19}),
        EAST("EAST".new int[] {20.21.22.23.24.25.26.27.28}),
        WEST("WEST".new int[] {29.30.31.32.33.34});

        private String area;
        private int[] companyIds;

        CompanyMap(String key,int[] values){
            this.area = key;
            this.companyIds = values;
        }

        public String getArea(a) { return area; }
        public void setArea(String area) { this.area = area; }
        public int[] getCompanyIds() { return companyIds; }
        public void setCompanyIds(int[] companyIds) { this.companyIds = companyIds; }}}Copy the code

Step 2: Create applications and deployment groups and deploy them

Step 3: Create a TCT task

Step 4: Manually start the task test

The test results

You can view the instance execution status through the console, and query the fragment parameters in a batch of instance execution through the fragment parameter button.

By viewing the result of application logs, it can be found that one instance runs two sharding tasks, because TCT judges the load of instances and selects relatively idle instances.

We also tested in-service instance anomalies, which are tasks that perform when only one of the summaryData service instances is up and running (I’ve excerpted the important part because of the length of the logs). It can be seen that the first three sharding tasks are executed simultaneously with different threads, and the fourth sharding task is executed after the first three tasks are completed, which meets the expectation.

The future direction

The competition among frameworks of distributed task scheduling platform is long and difficult. All manufacturers are looking for a breakthrough in product value, and TCT still has many deficiencies. Therefore, it is necessary to keep thinking deeply from the perspective of market demand and technology trend. Aiming at the distributed task scheduling market, the author roughly summarizes the possible optimization directions of future products as follows:

  1. decentralized

The centralized distributed task scheduling platform has obvious disadvantages and is difficult to support large-scale enterprise landing scenarios. At the same time, the evolution trend of products and technologies in the market is gradually developing toward decentralization. The reason is that only decentralized distributed task scheduling platform has the possibility of large-scale commercialization. Successful commercialization cases are also the symbol of product maturity.

  1. The container is changed

Distributed task scheduling platform at present more communication between components and components for the traditional way of virtual machine, if we can achieve at the same time support the component container deployment, you can better use the container platform rapid start-up, resource scheduling, scale advantage, in order to improve the lateral support overall availability, reduce the operational cost enlarge shrinks when the capacity, effectively improve the whole throughput of platform, However, high availability, elastic expansion and shrinkage, and high performance are important factors to consider in the digital transformation of large enterprises.

  1. A programmable

More and more distributed task scenarios need to do complex task scheduling for multiple tasks. At present, the mainstream of task scheduling is still limited to simple logical processing such as serial, parallel, and or among tasks. In the future, there will be more need for a generic, programmable template language to describe task parameters and content, DAGS (directed acyclic graph), operators, triggering actions, etc., and standardize the definition of task choreography by various manufacturers.

  1. Fault tolerance compensation

There are also many aspects to be improved in the processing strategy of abnormal execution of tasks and workflows, such as the out-of-date trigger problem caused by instance ramming, task chasing and task stacking, whether to retry the whole or retry after the abnormal task in workflow scenarios, etc.

  1. Scene upgrade

In the common scheduled task scenario, various products have a high degree of function homogeneity. However, with the rapid development of cloud native, big data and other related fields, the distributed task scheduling platform has gradually produced new application scenarios, such as distributed computing and computing summary in the big data scenario, scheduling platform docking serverless application, etc., which put forward higher requirements for product scenarios and functions.

conclusion

Through to the timing task scenario, evolution history, the platform framework is introduced and tencent cloud from the research of distributed task scheduling framework TCT practice case description, the author on the basis of the predecessors’ application status and future development of distributed task scheduling framework is briefly analyzed, know and don’t know before readers should all know now. We hope this article can provide some ideas and perspectives on technology selection and open source construction for enterprises and open source community reference.

reference

Cloud.tencent.com/document/pr… www.quartz-scheduler.org/ www.xuxueli.com/xxl-job shardingsphere.apache.org/elasticjob/… www.powerjob.tech/ vipshop. Making. IO/Saturn / # / useful…