In this paper, we introduce a method of fragment task processing using XXL-job, and add flexible control to the number of executing nodes.

scenario

Now there is a large amount of data in a table that needs to be processed by a server application.

  1. Parallel processing;
  2. It can control the number of parallel tasks flexibly.
  3. The pressure is evenly distributed to different server nodes.

Train of thought

Since it is necessary to process the data in the same data table in parallel, it is natural to think of the fragmented query data, which can be divided by the method of taking the module of ID to avoid the same data being processed repeatedly.

According to the requirements of Point 1 and 2, it was originally intended to achieve this through the dynamic configuration of thread pool, but considering point 3, the number of server nodes may change, and there is no perception and communication between nodes, so it may be very complicated to implement a set of scheduling mechanism in the application.

It would be nice if there were a scheduler that was independent of these server nodes — along the way, the existing distributed task scheduling platform XXL-Job came to mind, and after reading its official documentation, “Sharding broadcast & Dynamic Sharding” fits this scenario perfectly.

plan

  1. Scheduling scheduled tasks using the routing policy Fragment broadcast of XXL-job.
  2. Pass the number of executing task nodes through task parameters;
  3. In the timing task logic, according to the obtained fragment parameters and the number of task nodes, the decision whether the current node needs to be executed is made, and the fragment data is queried and processed:
    • ifFragment Number > (Number of task nodes – 1), the current node does not execute the task and returns directly.
    • Otherwise,Shard serial numberNumber of nodes on which tasks are executedAs a sharding parameter, data is queried and processed.

In this way, we can flexibly schedule [1, N] nodes to perform tasks and process data in parallel.

Main code example

JobHandler example:

@XxlJob("demoJobHandler")
public void execute(a) {
    String param = XxlJobHelper.getJobParam();
    if (StringUtils.isBlank(param)) {
        XxlJobHelper.log("Task parameters are empty");
        XxlJobHelper.handleFail();
        return;
    }

    // Number of task nodes
    int executeNodeNum = Integer.valueOf(param);
    // Fragment number
    int shardIndex = XxlJobHelper.getShardIndex();
    // Total number of fragments
    int shardTotal = XxlJobHelper.getShardTotal();

    if (executeNodeNum <= 0 || executeNodeNum > shardTotal) {
        XxlJobHelper.log("Number of executing task nodes Range [1, total number of nodes]");
        XxlJobHelper.handleFail();
        return;
    }

    if (shardIndex > (executeNodeNum - 1)) {
        XxlJobHelper.log("Current fragment {} does not need to be executed", shardIndex);
        XxlJobHelper.handleSuccess();
        return;
    }

    shardTotal = executeNodeNum;

    // Query data in fragments and process it
    process(shardIndex, shardTotal);

    XxlJobHelper.handleSuccess();
}
Copy the code

Example of fragmented query data:

select field1, field2 
from table_name 
where.and mod(id, #{shardTotal}) = #{shardIndex} 
order by id limit #{rows};
Copy the code

Think further

  1. What can I do if more concurrent tasks are required than the number of application nodes?

    Two ideas:

    • A concurrent number is passed in through the task parameter. When a single node processes a task, the queried data is re-fragmented according to this number and sent to the thread pool for parallel processing.
    • Configure M scheduled tasks, specify the same JobHandler, and number them 0, 1, 2… M, and pass the periodic task id and M into the task parameters. In the periodic task logic, recalculate the new fragment parameters based on the fragment parameter, periodic task ID and M, as shown in the followingFragment number = (Fragment number x M) + Periodic task number.Total number of fragments = Total number of fragments * M, and then query the data and process.
  2. What can I do if the task execution logic may be changed frequently, including adding task parameters, without restarting the server?

    You can use “GLUE mode” tasks of xxl-job to edit and update the execution logic of scheduled tasks online.

reference

  • Xxl-job Distributed task scheduling platform