14.1 太牛了!任务编排和规则引擎竟然还能这样设计?在分布式任务调度系统中,任务编排和规则引擎是核心组件之一。它们决定了任务如何按照预定的逻辑执行,如何处理任务间的依赖关系,以及如何根据条件做出决策。今天我们就来深入探讨一下如何设计一个强大的任务编排系统和规则引擎,让你的分布式任务调度系统更加智能和灵活!任务编排的重要性任务编排是指将多个独立的任务按照一定的规则和顺序组织起来,形成一个完整的业务流程。在复杂的业务场景中,单一任务往往无法满足业务需求,需要多个任务协同工作才能完成一个完整的业务流程。常见的任务编排需求包括:前置任务:任务B必须在任务A完成后才能执行后置任务:任务A完成后自动触发任务B并行执行:多个任务可以同时执行以提高效率条件分支:根据任务执行结果或其他条件决定下一步执行哪个任务循环执行:重复执行某些任务直到满足特定条件任务编排系统设计让我们先来看看一个基本的任务编排系统应该如何设计:packageorchestratorimport("context""fmt""sync""time")// Task 任务接口typeTaskinterface{Execute(ctx context.Context)(interface{},error)GetID()string}// TaskNode 任务节点typeTaskNodestruct{IDstringTask Task Dependencies[]string// 依赖的任务IDNext[]string// 下一步执行的任务IDConditionfunc(resultinterface{})bool// 执行条件}// TaskOrchestrator 任务编排器typeTaskOrchestratorstruct{tasksmap[string]*TaskNode mu sync.RWMutex}// NewTaskOrchestrator 创建任务编排器funcNewTaskOrchestrator()*TaskOrchestrator{returnTaskOrchestrator{tasks:make(map[string]*TaskNode),}}// AddTask 添加任务func(o*TaskOrchestrator)AddTask(node*TaskNode){o.mu.Lock()defero.mu.Unlock()o.tasks[node.ID]=node}// Execute 执行任务编排func(o*TaskOrchestrator)Execute(ctx context.Context)error{o.mu.RLock()defero.mu.RUnlock()// 记录已完成的任务completed:=make(map[string]interface{})completedMu:=sync.Mutex{}// 记录正在执行的任务executing:=make(map[string]bool)executingMu:=sync.Mutex{}// 使用 WaitGroup 等待所有任务完成varwg sync.WaitGroup// 错误收集varerrMsgs[]stringerrMu:=sync.Mutex{}// 查找没有依赖的任务开始执行for_,node:=rangeo.tasks{iflen(node.Dependencies)==0{wg.Add(1)goo.executeTask(ctx,node,completed,completedMu,executing,executingMu,wg,errMu,errMsgs)}}wg.Wait()iflen(errMsgs)0{returnfmt.Errorf("task execution failed: %v",errMsgs)}returnnil}// executeTask 执行单个任务func(o*TaskOrchestrator)executeTask(ctx context.Context,node*TaskNode,completedmap[string]interface{},completedMu*sync.Mutex,executingmap[string]bool,executingMu*sync.Mutex,wg*sync.WaitGroup,errMu*sync.Mutex,errMsgs*[]string,){deferwg.Done()// 标记任务正在执行executingMu.Lock()executing[node.ID]=trueexecutingMu.Unlock()// 执行任务result,err:=node.Task.Execute(ctx)// 标记任务执行完成executingMu.Lock()delete(executing,node.ID)executingMu.Unlock()iferr!=nil{errMu.Lock()*errMsgs=append(*errMsgs,fmt.Sprintf("task %s failed: %v",node.ID,err))errMu.Unlock()return}// 记录任务结果completedMu.Lock()completed[node.ID]=result completedMu.Unlock()// 检查并触发后续任务o.triggerNextTasks(ctx,node,result,completed,completedMu,executing,executingMu,wg,errMu,errMsgs)}// triggerNextTasks 触发后续任务func(o*TaskOrchestrator)triggerNextTasks(ctx context.Context,node*TaskNode,resultinterface{},completedmap[string]interface{},completedMu*sync.Mutex,executingmap[