import ( "fmt" "strconv" "sync" "time" ) var Context = map[int]interface{}{} type DependOnParams struct { Times int Channel chan struct{} } var Mutex sync.Mutex //var MoreDependOn = map[int]DependOnParams{} //var Group sync.WaitGroup Var MoreDependOn = sync.Map{} var MoreDependOn = sync.Map{} var MoreDependOn = sync.Map{} var MoreDependOn = sync.Map{} var MoreDependOn = sync.Map{ struct { RelayOn map[int]interface{} Name string Index int DependOn *DependOnParams } func (p Plugin)Do() interface{}{ Println(p) return p.index + 100000} // add edge func (g *Graph) addVertex(t Plugin, Func Pop(list [] plugins) (plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins, plugins []Plugin) { if len(list) > 0 { a := list[0] b := list[1:] return a, b } else { return Plugin{}, Func Push(list []Plugin, value Plugin) []Plugin {result := append(list, Func (g *Graph) KhanSort() {var inDegree = make(map[int]int) var queue []Plugin for I := 0; i < len(g.vertex); i++ { for _, m := range g.list[g.vertex[i].Index] { inDegree[m.Index]++ } } for i := 0; i < len(g.vertex); i++ { if inDegree[g.vertex[i].Index] == 0 { queue = Push(queue, G. vertex[I])}} for len(queue) > 0 {var now Plugin now, queue = Pop(queue) Go func() {fmt.println ("---->" + strconv.itoa (now.do ().(int))) Context[now.index] = now.do () for _,k:= range g.list[now.Index]{ inDegree[k.Index]-- } }() for _, DependencyInjection(k,now) if inDegree[k.index] == 0 {if inDegree[k.index] == 0; if inDegree[k.index] == 0; if inDegree[k.index] == 0; Queue = Push(queue, k)}} time.sleep (2* time.minute)} Func (g *Graph) KhanExecute() {for I := 1; i < len(g.vertex); i++ { for _, m := range g.list[g.vertex[i].Index] { m.DependOn.Times++ //result,_ := MoreDependOn.Load(m.Index) //if result == nil { // MoreDependOn.Store(m.Index,&DependOnParams{ // Times: 1, // Channel: make(chan struct{}), // }) //}else{ // depend := result.(*DependOnParams) // depend.Times++ // MoreDependOn.Store(m.Index,depend) //} // [g.vertex[I].index] = 1} for I := 1; i < len(g.vertex); I ++ {I := I go func() {// do a dependent block //params,_ := moredependon.load (g.vertex[I].index) //if params == nil{//params = nil &DependOnParams{ // Times: 0, // Channel: nil, // } //} //times := params.(*DependOnParams) for j := 0; j < g.vertex[i].DependOn.Times; j++ { select { case <- g.vertex[i].DependOn.Channel: DependencyInjection(g.vertex[I]) result := if (case <- time.after (time.minute * 40): DependencyInjection(g.vertex[I]) result := Context[g.vertex[I].index] = result for k := 0; k < len(g.list[g.vertex[i].Index]); k++ { g.list[g.vertex[i].Index][k].DependOn.Channel <- struct{}{} } }() } time.Sleep(2* time.Minute) } func DependencyInjection(plugin Plugin) { for index,_ := range plugin.RelayOn { plugin.RelayOn[index] = Context[index] } } // Func main() {g := NewGraph(9) for I := 1; i < 9; i++ { if i ! = 5{ g.vertex[i] = Plugin{Index: i, RelayOn: map[int]interface{}{},DependOn: &DependOnParams{Times: 0,Channel: make(chan struct{})}} }else { g.vertex[i] = Plugin{Index: 0, RelayOn: map[int]interface{}{},DependOn: &DependOnParams{Times: 0,Channel: make(chan struct{})}} } } g.addVertex(g.vertex[2], g.vertex[1]) g.addVertex(g.vertex[3], g.vertex[1]) g.addVertex(g.vertex[7], g.vertex[1]) g.addVertex(g.vertex[4], g.vertex[2]) g.addVertex(g.vertex[5], g.vertex[2]) g.addVertex(g.vertex[8], // create Graph func NewGraph(v int) *Graph {g := new(Graph) g.vertex = make([]Plugin,v) g.ist = map[int][]Plugin{} i := 0 for i < v { g.list[i] = make([]Plugin, 0) i++ } return g }Copy the code
It’s essentially doing a topological sort, but after topological sort it’s going to queue up all the things that have an input of 0, and then it’s going to traverse the queue, and it’s going to clean up all the dependent points at that point; It then determines whether the input of the dependent point is 0, and joins the queue until the queue is empty.
The essence of plug-in scheduling is that, after scanning for dependencies, one coroutine is opened for each plug-in, which opens channel blocking based on its dependencies. After the run is complete, the result of the run is first cached in the context and then channelized into the dependent coroutine channels.
The second part
After the simulation implementation of DAG, this technique needs to be applied to the orchestration of workflow. First, the operations of the plug-in are abstracted according to the business requirements. The actions of the plug-in include:
-
Number of dependencies
-
Dependent channel (semaphore)
-
Dependent data
-
Data type since
-
Definition of plug-ins
Do(ctx *static.OptionalParams, relyOnParams ... interface{}) (interface{}, error) GetValue() chan *Value GetIndex() int On() *RelyOn GetResultType() string SetHandlerInfo(info *HandlerInfo) GetHandlerInfo() *HandlerInfo SetHandlerSystem(system *HandlerSystem) GetHandlerSystem() *HandlerSystem } type DoValue func(ctx *static.OptionalParams, relyOnParams ... interface{}) (interface{}, error) type HandlerSystem struct { Executable bool Token string System func(value DoValue) DoValue Recover func() logrus.Logger } type HandlerInfo struct { ID int Rely *RelyOn Value chan *Value Type string } type RelyOn struct { Times int Channel chan struct{} RelyOnParams map[int]*Value } type Value struct { Type string Result interface{} Err chan error }Copy the code
Scan.
scheduler := graph.TasksScheduler{ PointTask: make([]graph.Handler, 0, 0), AdjustTask: make(map[int][]graph.Handler), HandlerContext: make(map[int]*graph.Value), } if a.Recall ! = nil { for i := 0; i < len(a.Recall); i++ { scheduler.PointTask = append(scheduler.PointTask, a.Recall[i]) } } if a.ReSorter ! = nil { for i := 0; i < len(a.ReSorter); i++ { scheduler.PointTask = append(scheduler.PointTask, a.ReSorter[i]) } } if a.Filter ! = nil { for i := 0; i < len(a.Filter); i++ { scheduler.PointTask = append(scheduler.PointTask, a.Filter[i]) } } if a.Sorter ! = nil { for i := 0; i < len(a.Sorter); i++ { scheduler.PointTask = append(scheduler.PointTask, a.Sorter[i]) } } if a.Extra ! = nil { for i := 0; i < len(a.Extra); i++ { scheduler.PointTask = append(scheduler.PointTask, a.Extra[i]) } } for index, slice := range a.Config { RelyOns := make([]graph.Handler, 0, 0) for k := 0; k < len(slice); k++ { for j := 0; j < len(scheduler.PointTask); j++ { if slice[k] == scheduler.PointTask[j].GetIndex() { RelyOns = append(RelyOns, scheduler.PointTask[j]) } } } scheduler.AdjustTask[index] = append(scheduler.AdjustTask[index], RelyOns...) } return &scheduler }Copy the code
Perform DAG
multipleHandlerResult := make(map[int]chan interface{}, len(g.PointTask)) var ss sync.WaitGroup ss.Add(len(g.PointTask)) for i := 0; i < len(g.PointTask); i++ { for _, M := range g.addJustTask [g.ointtask [I].getIndex ()] {m.on ().times ++ // initialization of dependent values if m.on ().relyonParams == nil { m.On().RelyOnParams = make(map[int]*Value) } m.On().RelyOnParams[g.PointTask[i].GetIndex()] = &Value{} } } for i := 0; i < len(g.PointTask); i++ { if g.PointTask[i].On().Times > 0 { g.PointTask[i].On().Channel = make(chan struct{}, g.PointTask[i].On().Times) } } for i := 0; i < len(g.PointTask); i++ { i := i go func() { defer func() { if rec := recover(); rec ! = nil { SendSemaphoreToRelayed(g.AdjustTask, G.ointtask [I].getIndex ()) fmt.println (string(debug.stack ()))} ss.done ()}( DependencyInjection(g.pointtask [I]) := DependencyInjection(g.pointtask [I], g.HandlerContext) result, _ := g.PointTask[i].Do(ctx, relyOnParams...) if result ! = nil { ctx.GoodsIDs = result.([]int64) } fmt.Println(result) fmt.Println(g.PointTask[i].GetIndex()) value := &Value{ Type: g.PointTask[i].GetResultType(), Result: result, } multipleHandlerResult[g.PointTask[i].GetIndex()] = make(chan interface{}, 1) multipleHandlerResult[g.pointtask [I].getIndex ()] < -result CacheResultToContext(g.handlerContext, value, G.pointtask [I].getIndex ()) // Send SendSemaphoreToRelayed(g.addJustTask, g.PointTask[i].GetIndex()) }() } ss.Wait() return multipleHandlerResult } func DependencyInjection(plugin Handler, Context map[int]*Value) []interface{} { relyOnParams := make([]interface{}, 0, 0) for index, _ := range plugin.On().RelyOnParams { if checkRelyOnParams(plugin.On().RelyOnParams[index].Type, Context[index].Type) { relyOnParams = append(relyOnParams, Context[index]) } } return relyOnParams } func SendSemaphoreToRelayed(adjust map[int][]Handler, curPluginIndex int) { for k := 0; k < len(adjust[curPluginIndex]); k++ { adjust[curPluginIndex][k].On().Channel <- struct{}{} } } func CacheResultToContext(ctx map[int]*Value, result *Value, index int) { ctx[index] = result } func DependencyBlocking(curPlugin Handler) { for j := 0; j < curPlugin.On().Times; j++ { select { case <-curPlugin.On().Channel: case <-time.After(40 * time.Second): fmt.Println("over the time:" + strconv.Itoa(curPlugin.GetIndex())) } } fmt.Println("finish:" + strconv.Itoa(curPlugin.GetIndex())) } func checkRelyOnParams(args1, args2 string) bool { return strings.EqualFold(args1, args2) }Copy the code