A general discussion of performing scheduling

Scheduling problem scenario

As a query engine, an important problem is how to solve the scheduling problem of computing tasks. What kind of problems do we encounter?

  1. Take a simple example, for example, I now have 10 requests, and the task numbered [0] needs 10s, while the task numbered [1] – [9] needs 1s. If all tasks are executed in sequence, will the Lantency of all tasks be very high?
    • The simple solution is to perform tasks [1] – [9] first and tasks [0] at the end.
  2. But for an online query system, if in the execution of the process is still kept in the entry. If the execution time is 1s, does that mean that the [0] task will never be executed?
    • So we need a system that is fairly fair, where some missions are never hungry.
  3. Tasks may be interrelated. For example, tasks [0] – [5] need to be completed before tasks [6] can be performed.
    • Tasks are related to each other, and a unified scheduling strategy may be needed.
  4. Scheduling results in distributed systems may be uncontrollable, right? For example, a request to a dependent service is blocked. How can I adapt to the difference between the actual and expected time?
    • Consider uncontrollable distributed environments and extreme scenarios.

General principles and strategies

The modeling of such problems is similar in many places, and Presto’s idea goes something like this:

  1. Assign a time slice to perform the task, and divide the time slice among the tasks in turn to minimize Lantency and avoid starvation.
  2. Presto has a multi-tiered priority queue built into it, which balances the competition between different durations by accumulating the elapsed time of the tasks. By accumulating the elapsed time of the tasks, the queue is constantly upgraded so that tasks of the same cost can only compete with other tasks of the same level as possible.
  3. The dependency between presto-concept tasks is solved by constructing DAG, which will not be discussed here. In the same task, multiple splits are often related to some extent. Presto uses a TaskPriorityTracker to aggregate type runtime and manage priority scheduling policies.

Presto’s model abstraction

Presto builds the following classes to solve this scheduling problem

  1. TaskExecutor: One within a single instance, accepts external task scheduling requests and task state management, holds worker thread resources, and maintains necessary statistics. A MultilevelSplitQueue is maintained internally, which maintains a split waiting for scheduling.
  2. MultilevelSplitQueue: a built-in hierarchical priority queue that maintains scheduling time and priority score waiting statistics by level
  3. PrioritizedSplitRunner: In order to realize the shard scheduling capability similar to that of the operating system, Presto abstracts a SplitRunner interface, schedules only one time shard for the processor of this interface, and then searches for a suitable task for the next shard execution. The main difference between PrioritizedSplitRunner and SplitRunner is that Priority is built in. Priority is the current level and the number of points within that level.
  4. TaskHandler: This class marks a group of split tasks. These split tasks may belong to the same Task and have the same processing logic, but they may also have some external constraints when scheduling. For example, their cumulative time needs to be allocated together. The concurrency of split may have certain requirements.

The overall execution process is roughly as follows, which is a mode of continuous cyclic execution in accordance with time fragments until the end of the task

Multi – LEVEL priority queue implementation

Member variables

We’ll start with a detailed description of the internal implementation of MultilevelSplitQueue, starting with the core member variables

private final List<PriorityQueue<PrioritizedSplitRunner>> levelWaitingSplits;
private final AtomicLong[] levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
private final AtomicLong[] levelMinPriority;
private final List<CounterStat> selectedLevelCounters;
Copy the code
  • LevelWaitingSplits: That is a 2-d array of priority queues with size 5.
  • LevelSchedueTime: is a scheduling time accumulator with size 5, representing the used scheduling time of the current level. PrioritizedSplitRunner increases the scheduling time for the current level after executing the task’s process. The data in this part is mainly used to calculate the specific level of the queue when polling data.
  • LevelTimeMultiplier: This field sets CPU time allocation between different levels.
  • LevelMinPriority is a priority score of size 5, which is the only constructor parameter that can be specified. The minimum priority score for the current level of the table name. The priority score for the current level will be updated each time the data is successfully taken from the queue. We will discuss this later when we will use PriorityTracker to manage the split priorities.

Implementation of queue function

The two most basic methods for priority queues are offer and take

  1. For the offer scenario, the MultilevelSplitQueue will accept a Runner with Priority and put the queue into the queue at the specified level.
  2. For the take scenario, it is slightly more complicated. Firstly, we have five queues. How to determine which level of queue is the most suitable for scheduling?

Presto does this, and the number of levels is fixed at 5: It is assumed that the expected distribution of CPU time at different levels is fixed, and the implementation chooses to use a power of levelMinPriority to determine the allocation ratio. For example, when levelMinPriority=2, The time distribution ratio of 0-5 levels is 1:2:4:8:16:32.

As mentioned earlier, we maintain an array of levelSchedueTime that identifies the scheduled time. A simple solution is to compare the scheduled time distribution at different levels to see if it is as expected. Then, the level corresponding to the smallest proportion is the level queue that needs to be taken and operated.

public PrioritizedSplitRunner take()
        throws InterruptedException
{
    while (true) {
        lock.lockInterruptibly();
        try {
            PrioritizedSplitRunner result;
            while ((result = pollSplit()) == null) {
                notEmpty.await();
            }

            if (result.updateLevelPriority()) {
                offer(result);
                continue;
            }

            int selectedLevel = result.getPriority().getLevel();
            levelMinPriority[selectedLevel].set(result.getPriority().getLevelPriority());
            selectedLevelCounters.get(selectedLevel).update(1);

            return result;
        }
        finally {
            lock.unlock();
        }
    }
}

@GuardedBy("lock")
private PrioritizedSplitRunner pollSplit()
{
    long targetScheduledTime = getLevel0TargetTime();
    double worstRatio = 1;
    int selectedLevel = -1;
    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
        if (!levelWaitingSplits.get(level).isEmpty()) {
            long levelTime = levelScheduledTime[level].get();
            double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime);
            if (selectedLevel == -1 || ratio > worstRatio) {
                worstRatio = ratio;
                selectedLevel = level;
            }
        }

        targetScheduledTime /= levelTimeMultiplier;
    }

    if (selectedLevel == -1) {
        return null;
    }

    PrioritizedSplitRunner result = levelWaitingSplits.get(selectedLevel).poll();
    checkState(result != null, "pollSplit cannot return null");

    return result;
}

public void offer(PrioritizedSplitRunner split)
{
    checkArgument(split != null, "split is null");

    split.setReady();
    int level = split.getPriority().getLevel();
    lock.lock();
    try {
        if (levelWaitingSplits.get(level).isEmpty()) {
            // Accesses to levelScheduledTime are not synchronized, so we have a data race
            // here - our level time math will be off. However, the staleness is bounded by
            // the fact that only running splits that complete during this computation
            // can update the level time. Therefore, this is benign.
            long level0Time = getLevel0TargetTime();
            long levelExpectedTime = (long) (level0Time / Math.pow(levelTimeMultiplier, level));
            long delta = levelExpectedTime - levelScheduledTime[level].get();
            levelScheduledTime[level].addAndGet(delta);
        }

        levelWaitingSplits.get(level).offer(split);
        notEmpty.signal();
    }
    finally {
        lock.unlock();
    }
}
Copy the code

Discussion of level0TagetTime for baseline time

What’s interesting here is how the implementation handles the calculation of proportions. Instead of directly using a constant, the implementation does the calculation by calculating a target0 base time and then using this time bastime to calculate each take. At the same time, when the new level is offered, it will also set an assumed value based on the current calculated BaseTime multiplied by the preset coefficient, pretending that the task has been executed.

As for why? One possible reason is to avoid adding too many requests to the same level [such as Level0], and then adding some split to other levels. If normal logic is followed, level0’s split may not work for quite a long time. Because it took too long to schedule. But there is a cost to doing this, which is that you have to calculate it every time you take it.

@GuardedBy("lock")
private long getLevel0TargetTime(a)
{
    long level0TargetTime = levelScheduledTime[0].get();
    double currentMultiplier = levelTimeMultiplier;

    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
        currentMultiplier /= levelTimeMultiplier;
        long levelTime = levelScheduledTime[level].get();
        level0TargetTime = Math.max(level0TargetTime, (long) (levelTime / currentMultiplier));
    }

    return level0TargetTime;
}
Copy the code

How to adjust the priority of a set of splits.

The task may be sent back to the queue. Why is that?


            if (result.updateLevelPriority()) {
                offer(result);
                continue;
            }

Copy the code

Going back to our original discussion of scenario 3, split for a group of tasks is actually relevant, for example we want to sum the CPU usage of a group of tasks rather than the sum of individual split.

  • First, we need a Tracker that needs to be associated with a set of splits, because only one common object can associate all the task-associated objects together. This way you can keep up to date every time you update your information.
  • Second, it is not enough to have a single reference. One problem is that split is managed in the heap. Although the comparison function is implemented, the calculation can only be triggered when the heap is in and out of the heap. Then maintain a Priority for a set of Objects of handle. TaskHandler’s Priority can actually be viewed as a Cache.

The logic of the update goes something like this: For example, if A TaskHandler has two split objects (A and B), when A task’s time is split, the Priority of A and TaskHandler is updated. If the TaskHandler level is different from the Priority level of the task B, the current task B needs to be put back into the heap.

public boolean updateLevelPriority() { Priority newPriority = taskHandle.getPriority(); Priority oldPriority = priority.getAndSet(newPriority); return newPriority.getLevel() ! = oldPriority.getLevel(); }Copy the code

The question here is why only level changes need to be reloaded into the heap? My own understanding is something like this. Theoretically, the strictest logic is for all splits from the same TaskHandler to be recalculated to update the Cache. Of course this is not feasible because of the high cost. Why are the levels different? The key reason is to ensure that the statistics at level level are accurate, because subsequent decisions need to be made on this part of the data. Once confused, the accuracy of scheduling will be very different.

How to implement cumulative time and split Priority adjustment

The first is that the logic to implement the decision is simple, and in fact dead code. Presto assigns a threshold to each level, and any accumulated time that exceeds this threshold will move to the next level.

The question is when? We’ve already discussed how to maintain Priority.

  • First of all, split for the first time in the heap determines the level according to the specified Priority.
  • For objects already in the queue, Priority is updated after time sharding, and the Tracker contains the total split time of the current TaskHandler. The level is calculated based on the total split time, and the heap is reloaded
public Priority updatePriority(Priority oldPriority, long quantaNanos, long scheduledNanos)
{
    int oldLevel = oldPriority.getLevel();
    int newLevel = computeLevel(scheduledNanos);

    long levelContribution = Math.min(quantaNanos, LEVEL_CONTRIBUTION_CAP);

    if (oldLevel == newLevel) {
        addLevelTime(oldLevel, levelContribution);
        return new Priority(oldLevel, oldPriority.getLevelPriority() + quantaNanos);
    }

    long remainingLevelContribution = levelContribution;
    long remainingTaskTime = quantaNanos;

    // a task normally slowly accrues scheduled time in a level and then moves to the next, but
    // if the split had a particularly long quanta, accrue time to each level as if it had run
    // in that level up to the level limit.
    for (int currentLevel = oldLevel; currentLevel < newLevel; currentLevel++) {
        long timeAccruedToLevel = Math.min(SECONDS.toNanos(LEVEL_THRESHOLD_SECONDS[currentLevel + 1] - LEVEL_THRESHOLD_SECONDS[currentLevel]), remainingLevelContribution);
        addLevelTime(currentLevel, timeAccruedToLevel);
        remainingLevelContribution -= timeAccruedToLevel;
        remainingTaskTime -= timeAccruedToLevel;
    }

    addLevelTime(newLevel, remainingLevelContribution);
    long newLevelMinPriority = getLevelMinPriority(newLevel, scheduledNanos);
    return new Priority(newLevel, newLevelMinPriority + remainingTaskTime);
}
Copy the code

This code also discusses a very interesting problem. Although Presto divides split into time slices, it does not mean that the time slice is strictly valid. For example, in some scenarios, due to IO problems, the scheduling time of the whole split is far more than one time slice, such as a few minutes. This can lead to hunger problems if some extra processing is not done:

  • On the one hand, Presto sets an upper limit on the scheduling time LEVEL_CONTRIBUTION_CAP (30s), that is, a process can only last 30 seconds at most.
  • On the other hand, when there is a large difference between new-level and old-level, the CPU time of all levels in the interval will be filled up according to the threshold setting instead of directly increasing the new-level CPU time. This prevents other splits of the New-level from going hungry due to a single task.