A DolphinDB Job is the basic DolphinDB execution unit. It can be simply defined as the execution of a DolphinDB script in a DolphinDB system. Jobs can be classified into synchronous jobs and asynchronous jobs according to whether they are blocked or not.
Synchronous operation
Synchronous jobs, also called Interactive jobs, come from the following sources:
- Web notebook
- DolphinDB GUI
- DolphinDB Command line interface
- DolphinDB provides various programming language apis
DolphinDB automatically gives higher priority to DolphinDB for faster access to computing resources because of the high real-time requirements for these types of jobs.
Asynchronous job
DolphinDB asynchronous jobs are performed in the background and include:
- A batch job submitted via the submitJob or submitJobEx function.
- A scheduled job submitted through the scheduleJob function.
- Streaming operations.
DolphinDB usually gave lower priority to DolphinDB when these tasks required less real-time feedback on results and required longer periods of time.
The subtasks
DolphinDB tables with large numbers of data are usually partitioned. If A query calculation task (such as SQL query) of A partitioned table exists in Job A, the task is divided into multiple sub-tasks and sent to different nodes for parallel execution. After the sub-tasks are executed, the result is merged and Job A continues to execute the sub-tasks. DolphinDB’s distributed computing also produced sub-tasks. Thus, Job can also be understood as a series of subtasks.
The Worker and the Executor
DolphinDB is a P2P system where each Data Node has the same role and can perform jobs submitted by the user. Because a Job can produce sub-tasks, each Data Node needs a dispatcher who is responsible for the internal execution of the Job. This is called a Worker. It is responsible for processing the jobs submitted by users, simply calculating the execution of the jobs, performing Job decomposition, task distribution, and collecting the final execution results. The subtasks decomposed from the Job are distributed to Data nodes in the cluster (or local Data nodes) and executed by Worker or Executor threads on the Data Node.
Workers and executors perform jobs in the following situations:
- When a table is not partitioned, the queried Job will be executed by a Worker thread.
- When a table is partitioned and stored on a single machine, the query Job may be split into multiple sub-tasks and executed by multiple Executor threads on the node to achieve parallel computing.
- When a table is partitioned and stored in DFS, the queried Job may be decomposed into multiple sub-tasks, which will be distributed to workers of other nodes for execution to achieve the effect of distributed computing.
DolphinDB sends DolphinDB sub-tasks to the Data Node where the Data resides to maximize performance, reducing network transfer overhead. Such as:
- For partition table stored in DFS, Worker will decompose and distribute tasks according to partition mode and Data Node where the partition is currently located.
- For distributed computing, the Worker will send subtasks to the corresponding Data source Data Node for execution according to Data source information.
Job scheduling
The Job priority
DolphinDB jobs are scheduled according to their priorities, which range from 0 to 9. The higher the priority, the higher the priority. For jobs with a higher priority, the system provides computing resources in a timely manner. By default, each Job has a default priority of 4, which is adjusted according to the Job type.
Job Scheduling Policy
DolphinDB used multiple feedback queues to schedule jobs based on their priorities. Specifically, the system maintains 10 queues with 10 priorities. The system always allocates thread resources to jobs with higher priorities. For jobs with the same priorities, the system allocates thread resources to jobs in round robin mode. When a priority queue is empty, jobs in the queue with a lower priority are processed.
Job parallelism
DolphinDB jobs also have parallelism, which indicates the maximum number of parallel tasks that can be performed on a Data Node at the same time. The default value is 2. DolphinDB jobs can be considered a time slice. For example, if the parallelism of a Job is 2 and the Job generates 100 parallel subtasks, the system allocates only two threads for the calculation of the subtasks when the Job is scheduled. Therefore, it takes 50 rounds of scheduling to complete the Job execution.
Dynamic changes of Job priorities
DolphinDB lowers the priority of low-priority jobs to prevent them from being starved for long periods. After the time slice of a job is executed, the priority of a job with a lower priority is automatically lowered. When the priority reaches the lowest point, it returns to the original priority. So sooner or later, low-priority tasks will be assigned to solve the hunger problem.
Set the Job priority
DolphinDB jobs can be set as follows:
- The console, Web Notebook, and API submitted interactive jobs have a priority of min(4, an adjustable user’s highest priority) and can be adjusted by changing the user’s own priority value.
- For batch jobs submitted through submitJobs, the system gives default priority, which is 4. Users can also specify priorities using the submitJobEx function.
- The priority of a scheduled task cannot be changed. The default priority is 4.
Calculation of fault tolerance
DolphinDB Databases have some fault tolerance in distributed computing, thanks to redundant storage of partitioned copies. When a subtask is sent to a partitioned replica Node, if the Node fails or the partition replica suffers a Data check error (copy corruption), the Job Scheduler(i.e., a worke thread of a Data Node) will find the fault and select another replica Node of the partition. Re-execute the subtask. Users can adjust this redundancy by setting the dfsReplicationFactor parameter.
Computational and storage coupling and data sharing between jobs
DolphinDB is calculated as close to storage as possible. DolphinDB does not use computation-storage separation for several reasons:
- The separation of computing and storage leads to data redundancy. Consider the Spark+Hive architecture that separates storage and computing. Spark applications do not share storage. If N Spark applications read data from a table T from Hive, T must be loaded to the memory of N Spark applications. Therefore, the memory of N Spark applications is wasted. In multi-user scenarios, for example, a piece of TICK data may be shared and accessed by multiple analysts. If Spark mode is adopted, IT costs will increase.
- Copy latency issues. Although data centers are now increasingly equipped with new hardware such as RDMA and NVMe, network latency and throughput have increased significantly. However, DolphinDB systems are often deployed in data centers where the networks and hardware are not as good as DolphinDB systems, and moving data between networks can be a serious performance bottleneck.
DolphinDB adopted a computation-storage coupling architecture for these reasons. To be specific:
- DolphinDB’s solution to memory waste is to share data between Jobs (corresponding to Spark applications). After data is partitioned to DolphinDB DFS, each partition copy has its own node. There is only one partition copy on a node. If subtasks of multiple jobs involve the same partition copy, the partition copy can be shared in the memory, reducing memory waste.
- DolphinDB’s solution to copying delays is to send calculations to the node where the data resides. A Job is divided into multiple sub-tasks based on DFS partition information and sent to the node where the partition resides for execution. Because sending calculations to the node where the data resides is equivalent to just sending a piece of code, the network overhead is greatly reduced.