Github开源生信云平台 DEMO
JSON Workflow │ ▼ Workflow Loader │ ▼ DAG Builder │ 检查循环、拓扑排序 │ ▼ Scheduler Engine │ ┌───────────────┼────────────────┐ ▼ ▼ ▼ Local Executor Docker Executor K8s Executor │ │ │ └───────────────┼────────────────┘ ▼ Event Bus ▼ State Manager ▼ Database + WebSocket/SSE
Waiting │ 依赖满足 ▼ Ready Queue │ Worker Pool ▼ Running │ Finish ▼ Finished
你的问题实际上问到了 Nextflow、Airflow、Argo、Temporal、Dask 等所有 DAG 引擎的核心:数据驱动(Dataflow)而不是流程驱动(Control Flow)。
很多人第一次写工作流,都会写成:
for node in workflow: if dependency finished: run(node)
这是轮询式(Polling)。
而 Nextflow 根本不是这样工作的。
它实际上更像:
事件驱动 + 数据流(Dataflow) + Actor Model
也就是说:
Task Finish │ ▼ Publish Event │ ▼ Scheduler │ ▼ Ready Queue │ ▼ Worker
整个系统没有一个 while(true) 去遍历 DAG。
例如你的 JSON
{ "nodes":[ { "id":"fastp" }, { "id":"bwa", "depends":["fastp"] } ] }
先变成
fastp │ ▼ bwa
Graph
type Graph struct { Nodes map[string]*RuntimeNode }
真正运行的是 RuntimeNode,而不是 JSON。
例如:
type RuntimeNode struct { ID string Depends []string Children []*RuntimeNode State State WaitCount int }
这里 Children 很重要。
不要每次都去遍历 Graph。
建立:
fastp Children ↓ bwa
这样 Task 完成以后可以直接通知孩子。
fastp depends=[] ↓ WaitCount=0
bwa depends=[fastp] ↓ WaitCount=1
Scheduler 初始化:
fastp WaitCount=0 ↓ Ready Queue
而
bwa WaitCount=1
不会进入 Ready Queue。
所以初始化以后:
Ready Queue ↓ fastp
func worker() { for node := range readyQueue { executor.Run(node) } }
Worker 永远不知道 DAG。
它只知道:
Run Task
func Run(node *RuntimeNode){ cmd := exec.Command(...) cmd.Run() EventBus.Publish(TaskFinished{ Node:node, }) }
注意:
Executor 不会去调度别人。
它只负责:
运行 ↓ 完成 ↓ 发事件
type TaskFinished struct{ Node *RuntimeNode }
发布:
bus.Publish(TaskFinished{ Node:node, })
而 Scheduler:
bus.Subscribe(TaskFinished{},scheduler.OnTaskFinished)
这就是观察者模式。
Executor 根本不知道 Scheduler 存在。
假设:
fastp ↓ finished
Scheduler:
func OnTaskFinished(event TaskFinished){ node:=event.Node for _,child:=range node.Children{ child.WaitCount-- if child.WaitCount==0{ readyQueue<-child } } }
这就是整个 DAG 调度。
没有遍历。
只有:
Task Finish ↓ Children ↓ WaitCount-- ↓ Ready
A / | \ B C D
初始化:
A Wait=0
B Wait=1
C Wait=1
D Wait=1
A 完成:
B.Wait-- ↓ 0
进入:
Ready Queue
同理:
C Ready
D Ready
Worker 自动开始。
Nextflow 比这个高级。
fastp ↓ clean.fastq
不是:
Task Finish
而是:
Artifact Produced
ArtifactManager:
Publish( ArtifactProduced{ Name:"clean.fastq" } )
Artifact Produced ↓ 谁需要这个文件? ↓ bwa ↓ InputReady--
所以真正驱动的是:
数据 不是Task
建议:
type RuntimeNode struct{ ID string WaitCount int Children []*RuntimeNode Inputs []*Artifact Outputs []*Artifact State State }
Artifact
type Artifact struct{ Name string Path string Ready bool }
clean.fastq Ready=true
bwa:
Inputs ↓ clean.fastq
所有Input Ready ↓ Ready Queue
而不是:
Depends Finished
这是 Nextflow 的 Dataflow 思想。
JSON
↓
Workflow Loader
Runtime DAG
Scheduler
Worker
Executor
Task Finished
EventBus
Children Wait--
重复……
直到:
全部Finished
整个系统没有任何模块直接调用另一个模块:
不知道:
Scheduler 不知道:
Database
Database 不知道:
WebSocket
Monitor 不知道:
它们全部监听 EventBus:
TaskFinished │ ┌─────────────┼─────────────┐ ▼ ▼ ▼ Scheduler Database Monitor │ │ ▼ ▼ ReadyQueue SSE Push
因此增加新的能力(如日志、统计、告警)只需增加新的事件监听器,而无需修改调度逻辑。
上面的模型还有一个重要缺陷:一个节点只能运行一次。
但生信流程经常需要:
fastp
bwa
因此建议把**流程节点(Node)和运行实例(TaskExecution)**彻底分离。
type Node struct { ID string Command string Depends []string } type TaskExecution struct { ID string // 唯一运行实例 NodeID string // 属于哪个节点 Inputs []*Artifact Outputs []*Artifact State State WorkDir string }
这样:
Node(fastp) │ ┌────────┴────────┐ ▼ ▼ Task#1(sampleA) Task#2(sampleB)
调度器调度的是 TaskExecution,而不是 Node。Node 更像模板(Template),TaskExecution 才是真正进入 Ready Queue 的对象。
这也是 Nextflow 能支持 Channel、Scatter/Gather、动态任务展开的关键原因。
传统 DAG 引擎是:
"前驱任务完成了,所以我可以运行后继任务。"`
而 Nextflow 风格的数据流引擎是:
"当输入数据(Artifact/Channel)到达时,就动态创建一个新的 TaskExecution;当该 TaskExecution 完成并产生新的数据,再驱动下游节点继续生成新的 TaskExecution。"
因此,**真正被调度的是运行实例(TaskExecution),真正驱动调度的是数据(Artifact 或 Channel),事件总线负责将"数据产生"和"任务完成"传播给调度器,而调度器只负责根据这些事件创建新的运行实例并放入 Ready Queue。**这就是数据驱动工作流引擎的核心运行原理。