DolphinDB provides scheduled jobs that allow the system to automatically perform DolphinDB jobs at specified times and at specified frequencies. When we need to calculation and analysis database regularly perform some scripts automatically level (such as daily closed minutes K line calculation, monthly statistical report generation), database management (such as database backup and data synchronization), management of operating system (such as outdated log file deletion) etc, can be used to implement this feature.
Timed jobs are represented by a function, which gives great flexibility to job definitions. Anything that can be expressed as a function can be run as a scheduled task. Scheduled jobs are submitted through the scheduleJob function and run in the background at a set time. After a job is created, the job definition information is serialized and saved to the disk file of the data node. After the node is restarted, the system deserializes and loads scheduled jobs. The result of each run of a scheduled job is also saved to the node disk. We can use getJobMessage and getJobReturn to view the run log and return value of each job.
1. Function Introduction
1.1 Creating a Scheduled Job
The scheduleJob function is used to create a scheduled job. After a job is created, the system serializes the job definition information and saves it to the
/ sysmGMT/jobeditlog.meta file. The function syntax is as follows:
scheduleJob(jobId, jobDesc, jobFunc, scheduledTime, startDate, endDate, frequency, [days])
Copy the code
Among them are to note:
- The argument jobFunc (job function) is a function that takes no arguments.
- The scheduledTime argument can be a scalar or vector of type minute. When it is a vector, note that the interval between two adjacent time points cannot be less than 30 minutes.
- The return value is the job ID of the scheduled job. If the entered jobId is different from the jobId of an existing scheduled job, the system returns the entered jobId. Otherwise, add the current date, “000”, “001”, etc. as a suffix after jobId until a unique jobId is produced.
As we all know, executing a function must provide all the parameters that the function needs. In functional programming, a function that provides all the parameters is actually a Partial Application of the original function, that is, a function with no parameters. In DolphinDB, we use curly braces {} to indicate partial applications.
A variety of functions can be used as job functions, including custom functions, built-in functions, plug-in functions, Function views, and functions in modules. As a result, scheduled work can do almost anything. For example, use custom functions, plug-in functions to do computational analysis, use the built-in function RUN to run a script file, use shell functions to perform operating system management, and so on. In the following example, the job calls a custom function getMaxTemperature, which is used to calculate the maximum temperature of a device on the previous day. The parameter is the device number. When creating a job, getMaxTemperature{1} assigns the device number to 1.
def getMaxTemperature(deviceID){
maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
where ID=deviceID ,ts between (today()-1).datetime():(today().datetime()-1)
return maxTemp
}
scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');
Copy the code
The following example executes a script file. The job function uses the run function and specifies the full path of the script file monthlyjob.dos as an argument. The job is executed on the first day of each month in 2020.
ScheduleJob (` monthlyJob, "or Job 1", the run {"/home/DolphinDB/script/monthlyJob DOS "}, 00:00 m, 2020.01.01, 2020.12.31, 'M', 1);Copy the code
The following example executes an operating system command to delete log files. Job function with the shell function, and specify the specific command “rm/home/DolphinDB/server/DolphinDB log” as a parameter. Homework is done every Sunday at 1:00.
ScheduleJob (` weeklyjob, rm "log", shell {" rm/home/DolphinDB/server/DolphinDB log "}, 1:00 m, 2020.01.01, 2021.12.31, 'W', 6);Copy the code
In practical application, it is not very convenient to input and output with function parameters and function return values. We more commonly take out data from the database and store the results in the database after calculation. The following example calculates the minute-class K line after the market is closed each day. In the custom function computeK, market data was extracted from the distributed database table TRADES and stored in the distributed database table OHLC after calculation. The frequency of a job is “W”, the days are [1,2,3,4,5], and the scheduledTime is 15:00m, indicating that the job is executed from Monday to Friday at 15:00.
Def computeK(){barMinutes = 7 sessionsStart=09:30:00.000 13:00:00.000 OHLC = select first(price) as open, max(price) as high, min(price) as low,last(price) as close, sum(volume) as volume from loadTable("dfs://stock","trades") where time >today() and time < now() group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart append! (loadTable(" DFS ://stock","OHLC"),OHLC)} scheduleJob(' kJob, "7 Minutes", computeK, 15:00m, 2020.01.01, 2021.12.31, 'W', [1, 2, 3, 4, 5]);Copy the code
1.2 Querying scheduled Jobs
To query scheduled job definition information in a node, use getScheduledJobs. The function syntax is as follows:
getScheduledJobs([jobIdPattern])
Copy the code
The jobIdPattern parameter is a string representing the job ID or jobIdPattern. It supports wildcard characters “%” and “?” . The return value of the function is tabular information about scheduled jobs. If jobId is not specified, all jobs are returned.
The system saves the execution status of each job, including the run logs and returned values of scheduled jobs. The run logs are saved in the jodid. MSG file, and the return values of scheduled jobs are saved in the jobid. object file. These files are saved under the
/batchJobs directory. We can use getJobMessage and getJobReturn respectively to view the run log and return value for each job. Note the value of jobID. First, when creating a job, if the jobID is the same as the jobID of an existing scheduled job, the system does not return the input jobID. Second, for jobs that will be executed several times, the job ID is different each time the scheduled job is executed. So we need to use getRecentJobs to view the completed scheduled jobs. For example, we define the following timed job:
def foo(){ print "test scheduled job at"+ now() return now() } scheduleJob(`testJob, "foo", foo, 17:00m+0.. 2*30, today(), today(), 'D');Copy the code
Running getRecentJobs() returns the following information:
jobId jobDesc startTime endTime ------ ------- ----------------------- ---------------------- testJob foo1 2020.02.14T17:00:23.636 2020.02.14T17:00:23.639 testJob20200214 foo1 2020.02.14T17:30:23.908 2020.02.14T17:30:23.910 TestJob20200214000 foo1 2020.02.14 T18:00:23. 148 2020.02.14 T18:00:26. 749Copy the code
TestJob = testJob20200214; testJob20200214; testJob20200214 It changes every time. We can use getJobMessage and getJobReturn to see the third execution as shown below:
>getJobMessage(`testJob20200214000); 2020-02-14 18:00:23.148629 Start job [testJob20200214000]: Foo 2020-02-14 18:00:23.148721 Test the scheduled job at 2020.02.14T18:00:23.148 2020-02-14 18:00:26.749111 The job is scheduled done. >getJobReturn(`testJob20200214000); 2020.02.14 T18:00:23. 148Copy the code
1.3 Deleting a Scheduled Job
Delete the scheduled job function deleteScheduledJob. The syntax is as follows:
deleteScheduledJob(jobId)
Copy the code
The jobId parameter is the jobId. Before deleting, use getScheduledJobs to get the JOB ID of the job you want to delete.
2. Permission of scheduled jobs
The user logs in with the identity when creating a scheduled job and runs with that identity when executing a scheduled job. Therefore, when creating a scheduled job, ensure that the user has the permission to access the required resources. For example, if the login user is not an authorized user, he/she cannot access the distributed function of the cluster. If the distributed function of the cluster is used, an error occurs. In the following example, user guestUser1 has no permission to access DFS:
def foo1(){ print "Test scheduled job "+ now() cnt=exec count(*) from loadTable("dfs://FuturesContract","tb") print "The count of table is "+cnt return cnt } login("guestUser1","123456") scheduleJob(`guestGetDfsjob, "dfs read", foo1, [12:00 m, 21:03 m, when m], 2020.01.01, 2021.12.31, "D");Copy the code
After the job is executed, use getJobMessage(‘ guestGetDfsjob) to query, as shown in the following, scheduled job does not have permission to read the distributed database:
2020-02-14 21:03:23.193039 Start the job [guestGetDfsjob]: DFS Read 2020-02-14 21:03:23.193092 Test the scheduled job at 2020.02.14T21:03:23.193 2020-02-14 21:03:23.194914 Not Scheduled granted to read table dfs://FuturesContract/tbCopy the code
Therefore, to remotely perform certain functions of the controller node and access a distributed table in the cluster, you need to log in as an administrator (admin) or another authorized user. This can be done using the login function.
You can also see from the logs that the statement after accessing the distributed table was not executed, which means that the execution of the job will be interrupted if an error is encountered during the execution. To prevent an exception from stopping the execution of subsequent scripts, you can use a try-catch statement to capture an exception. The running information needs to be output during the running. You can print it by using print, and the output will be recorded in the jodid. MSG log file.
3. Serialize scheduled jobs
After a scheduled job is created, the system stores the userID, JOB ID, description, start time, job frequency, and job definition persistently. The storage path is
/ sysmGMT/jobeditlog.meta. Jobs are represented by a DolphinDB function. The definition of a function consists of a series of statements that call other functions and global class objects such as shared variables. Shared variables are serialized with names. When deserializing, the shared variable must exist or it will fail. Job functions or their dependent functions can be divided into two categories depending on whether they are compiled or not: compiled functions include built-in functions and plug-in functions, and script functions include custom functions, function views, and functions in modules. The serialization methods for these two classes of functions are different, as described below.
3.1 Serialization of compiled functions
Serialization of compiled functions serializes only the function name and module name. During deserialization, the system will search for these modules and functions. If they cannot be found, they will fail. So if a plug-in function is used in a timed job, it needs to be pre-loaded before deserialization. Dolphindb.dos, Function view, startup. DOS, and dolphindb.dos are used to initialize system and scheduled resources. Scheduled jobs are loaded after the startup script is executed. As shown in the following example, the ODBC plug-in is used in the job function jobDemo:
use odbc
def jobDemo(){
conn = odbc::connect("dsn=mysql_factorDBURL");
}
scheduleJob("job demo","example of init",jobDemo,15:48m, 2019.01.01, 2020.12.31, 'D')
Copy the code
However, the ODBC plug-in is not loaded when the system starts, so when reading the scheduled job, it cannot recognize this function and exits the system after output the following log.
<ERROR>:Failed to unmarshall the job [job demo]. Failed to deserialize assign statement.. Invalid message format
Copy the code
Add the following code to the startup script to load the ODBC plug-in, and the system will start successfully.
loadPlugin("plugins/odbc/odbc.cfg")
Copy the code
3.2 Serialization of script functions
Script functions serialize function parameters and each statement defined by the function. The statement also serializes the definitions of dependent script functions if they are included.
After a scheduled job is created, the scheduled job is not affected if the script functions are deleted or modified or its dependent script functions are modified. If you want the scheduled job to execute as the new function, you need to delete the scheduled job and then recreate it, otherwise the old serialized function will run. Note that the associated functions also need to be redefined. Here are some examples:
-
In example 1, the job function is modified after a scheduled job is created, as shown below. The job function f is redefined after a scheduleJob is created:
def f(){ print “The old function is called ” } scheduleJob(`test, “f”, f, 11:05m, today(), today(), ‘D’); go def f(){ print “The new function is called ” }
After the scheduled job is executed, getJobMessage(‘ test) gets the following information, which shows that the scheduled job is still executing the old custom function.
2020-02-14 11:05:53.382225 Start job [test]: F 2020-02-14 11:05:53.382267 The old function is called 2020-02-14 11:05:53.382277 The job is done.Copy the code
-
The job function is function view fv, which calls function foo. After scheduleJob, function foo is redefined and function view is regenerated:
def foo(){ print “The old function is called ” } def fv(){ foo() } addFunctionView(fv)
scheduleJob(
testFvJob, "fv", fv, 11:36m, today(), today(), 'D'); go def foo(){ print "The new function is called " } dropFunctionView(
fv) addFunctionView(fv)
After the timed job is executed, getJobMessage(‘ testFvJob) gets the following message, indicating that the timed job is still executing the old function.
Start the job [testFvJob]: Fv 2020-02-14 11:36:23.069939 The old function is called 2020-02-14 11:36:23.069951 The job is doneCopy the code
The same is true with module functions. We create a module printlog. DOS with the following contents:
module printLog
def printLogs(logText){
writeLog(string(now()) + " : " + logText)
print "The old function is called"
}
Copy the code
Then create a timed job that calls the printLog::printLogs function:
use printLog
def f5(){
printLogs("test my log")
}
scheduleJob(`testModule, "f5", f5, 13:32m, today(), today(), 'D');
Copy the code
Modify the module as follows before running a scheduled job:
module printLog
def printLogs(logText){
writeLog(string(now()) + " : " + logText)
print "The new function is called"
}
Copy the code
After the timed job executes, getJobMessage(‘ testModule) gets the following message, which shows that the timed job executes the old function.
2020-02-14 13:32:22.870855 Start the job [testModule]: f5
2020-02-14 13:32:22.871097 The old function is called
2020-02-14 13:32:22.871106 The job is done.
Copy the code
4. Run the script file periodically
When you create a scheduled job, if the run function is a script file, only the file name is saved during serialization, but the file content is not saved. Therefore, you need to put all the dependent custom functions in the script file. Otherwise, the execution fails because the customized functions cannot be found. For example, create a script file testjob. DOS with the following contents:
foo()
Copy the code
Then execute the following script in the DolphinDB GUI:
def foo(){ print ("Hello world!" ) } run "/home/xjqian/testjob.dos"Copy the code
The result shows that it can be executed normally:
Executing code (line 104-108) 2020.02.14 13:47:00.992: Executing code (line 104-108)... Hello world!Copy the code
Create the script file run with the code as follows:
ScheduleJob (' dailyFoofile1, "Daily Job 1", run {"/home/xjqian/testjob. DOS "}, 16:14m, 2020.01.01, 2020.12.31, 'D');Copy the code
When running the job, however, the following exception occurs:
Exception was raised when running the script [/home/xjqian/testjob.dos]:Syntax Error: [line #3] Cannot recognize the token foo
Copy the code
This is because foo’s function definition and scheduled job execution are not in the same session, and the job execution cannot find the function definition. Add the definition of foo() to the script file and modify the testjob.dos file as follows:
def foo(){ print ("Hello world!" ) } foo()Copy the code
Then re-create the scheduled job and run the script file.
5. Summary and outlook
Common faults and troubleshooting
- The job function references a shared variable, but the shared variable is not defined before the job is loaded. It is generally recommended that this shared variable be defined in the user’s startup script.
- The job function refers to the function in the plug-in, but the plug-in is not loaded before the job is loaded. It is generally recommended that you define loading the plug-in in the user’s startup script.
- Run a script file periodically, cannot find dependent function. The script file must contain the dependent custom functions.
- The user creating the scheduled job does not have access to the distributed database tables. Grants the user the permission to access the corresponding database.
- An exception is thrown when the scheduleJob, getScheduledJobs, and deleteScheduledJob functions are used in the startup script. When the node starts, the initialization of scheduled jobs comes after the startup script, so you cannot use the functions related to scheduled jobs in the startup script
In rare cases, scheduled job loading may fail or the system may fail to start when the system restarts. In particular, during the version upgrade, built-in functions, plug-in functions and other function interfaces may change, causing job loading failure, or compatibility bugs may occur, causing system restart failure. Therefore, we need to keep the scripts that define timed jobs in development. If the system fails to start due to scheduled tasks, you can delete the serialized file
/ sysmGMT/jobeditLog. meta of scheduled jobs and create these scheduled jobs again after the system restarts.
Follow-up function development
- Added functionality for defining browse job functions and dependent functions.
- Define and implement dependencies between scheduled jobs.