“This is the 27th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.
Introduction to the
Dolphinscheduler is a visual DAG workflow task scheduling platform that is popular for making task calls in big data
Provides similar azkaban workflow scheduling, stronger than azkaban visualization of DAG, supports large data domain flink, spark, shell, python, Java, scala, HTTP, etc all kinds of tasks
Website portal: dolphinscheduler.apache.org/zh-cn/
automation
Why automate tasks when your DolphinScheduler has hundreds or thousands of tasks that can be time consuming to manage, and if you configure email alerts for each task, you’ll be fighting fires all day long
In this case, task result monitoring and task rerun are needed to resolve failed tasks and automatic task rerun to avoid wasting too much time maintaining DolphinScheduler tasks
use
You need to apply for a token for the user before invoking the API
Dolphinscheduler provides UI tools similar to The Swagge interface to access doc addresseshttp://ip:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
example
This demo uses HTTP request package (HttpRequest), JSON data search package (Go-jmespath)
Task Result check
Filling holes show
- Date handling: %20 translated Spaces are used and strings are concatenated using the Sprintf method
- Multiple data types: Use interface{} to support multiple data types such as int and string
- Data conversion 1: Converts byte data to JSON format for easy search
- Data conversion 2: Turn interface{} data into string slices for easy use
This method can be used to execute periodic tasks to find out failed jobs, and determine whether to notify the failed jobs later or to find out the corresponding IDS based on the job names for rerunning jobs
package main
import (
"encoding/json"
"fmt"
"github.com/jmespath/go-jmespath"
"github.com/kirinlabs/HttpRequest"
"time"
)
var (
url = "http://ip:12345/dolphinscheduler"
token = "xxxxxxx"
req *HttpRequest.Request
)
func init() {
req = HttpRequest.NewRequest().Debug(true).SetTimeout(time.Second*5).
SetHeaders(map[string]string{
"token":token,
})
}
func main() {
//testConn()
jobCheck()
}
func jobCheck() {
// Get the date
today := time.Now().Format("2006-01-02")
tomorrow := time.Now().AddDate(0.0, +1).Format("2006-01-02")
// The concatenation date %20 is a translation of space
fmt.Println(fmt.Sprintf("%v%v",today,"% 2000:00:00"))
fmt.Println(fmt.Sprintf("%v%v",tomorrow,"% 2000:00:00"))
// The name of the project to check
projects := []string{"jdOrder"."jdPlay"}
// The page number of the period to be checked is int and the date is string
m := make(map[string]interface{})
m["pageNo"] = 1
m["pageSize"] = 22
m["stateType"] = "FAILURE"
m["startDate"] = fmt.Sprintf("%v%v",today,"% 2000:00:00")
m["endDate"] = fmt.Sprintf("%v%v",tomorrow,"% 2000:00:00")
for _, project := range projects {
resp, _ := req.Get(url+"/projects/"+project+"/task-instance/list-paging",m)
ifresp.StatusCode() ! =200 {
fmt.Println("Job check status code is not expected:",resp.StatusCode())
return
}
fmt.Println("resp",resp)
// Convert returned data from byte to JSON format
body, _ := resp.Body()
var i interface{}
var s []string
_ = json.Unmarshal(body, &i)
// Find the data corresponding to the required field
processInstanceNames, _ := jmespath.Search("data.totalList[*].processInstanceName", i)
// Convert interface to []string
for _,v := range processInstanceNames.([]interface{}) {
s = append(s,v.(string))
}
// Print the result
for _,v := range s {
fmt.Println(v)
}
}
}
Copy the code
Test the connection
If the task in the previous section fails to run, you can run this method first to test connection correctness
func testConn() {
resp, _ := req.Get(url + "/projects/query-project-list")
fmt.Println("resp",resp)
body, _ := resp.Body()
var i interface{}
_ = json.Unmarshal(body, &i)
fmt.Println("i",i)
}
Copy the code
Heavy run task
Rerunning a task is simply starting the task again by calling start_job
The project name and ID need through the interface to get, http://ip:12345/dolphinscheduler/projects/monitor/process/list-paging this is fixed
Call example: startJob(“ads_jd_order”,678)
func startJob(projectName string,projectId int) {
m := make(map[string]interface{})
m["failureStrategy"] = "CONTINUE"
m["warningGroupId"] = 0
m["warningType"] = "NONE"
m["runMode"] = "RUN_MODE_SERIAL"
m["processInstancePriority"] = "MEDIUM"
m["workerGroup"] = "default"
m["processDefinitionId"] = projectId
resp, _ := req.JSON().Post(url+"projects/" + projectName+"/executors/start-process-instance",m)
ifresp.StatusCode() ! =200 {
fmt.Println("Job start status code is not expected:",resp.StatusCode())
return}}Copy the code
summary
Dolphinscheduler API calls are documented and not too complex, but there is less material online to explore on your own. Bloggers share a wave here first