Github开源生信云平台 DEMO
这是一个非常好的问题,也是 Nextflow 与传统 DAG Scheduler 最大的区别。
答案先说:
不是每个 Process 创建一个 goroutine,更不是每个 Task 一个 goroutine 在死循环监听。 Nextflow 底层依赖 Groovy GPars Dataflow,它采用的是事件驱动(Event-driven)+ 少量线程池 + Dataflow Queue,而不是 Go 的 "一个 channel 对应一个 goroutine" 模型。
不是每个 Process 创建一个 goroutine,更不是每个 Task 一个 goroutine 在死循环监听。
Nextflow 底层依赖 Groovy GPars Dataflow,它采用的是事件驱动(Event-driven)+ 少量线程池 + Dataflow Queue,而不是 Go 的 "一个 channel 对应一个 goroutine" 模型。
可以一步一步来看。
Channel.of(1,2,3) | process A | process B
你可能会以为运行时像这样:
goroutine(A) while(true){ x <- channel }
goroutine(B) while(true){ y <- channel }
其实不是。
更接近于:
Channel │ publish(value) │ ▼ Dataflow Runtime │ 唤醒等待这个 Channel 的 Process │ ▼ 创建一个 Task 提交 Executor
也就是说:
不是 Process 在主动监听 Channel。
而是:
Channel 有数据时通知 Runtime。
这是两个方向完全相反。
Nextflow 底层(GPars)大概有这样的对象:
DataflowQueue subscribers ProcessA ProcessB ProcessC
例如:
channel.subscribe(processA) channel.subscribe(processB)
注意:
这里不是
ProcessA while(true){ <-channel }
而是
channel 保存: listeners
类似:
listeners = { processA, processB }
然后:
channel << value
实际上:
publish(value) ↓ 遍历 listeners ↓ 通知 Runtime
所以:
不是:
Process 主动 pull
Channel 主动 push
A ↓ channelA ↓ join ↓ process C
B ↓ channelB ↓ join
join Operator 内部维护:
mapA mapB
当:
channelA sample1
来了。
mapA[sample1]=...
继续等待。
channelB sample1
join 立即:
emit(sample1)
process C
得到:
sample1
整个过程:
没有任何:
while(true)
去扫描。
真正的 Runtime 更像:
Event Loop │ publish(channel,value) │ lookup subscribers │ invoke operator │ output channel ready? │ create Task │ submit Executor
是不是很像:
Java 的:
Netty NIO Reactor
或者:
Node.js libuv
process Align
只有一个:
Process Definition
sample1 Align sample2 Align sample3 Align
开始的时候只有:
Align
输入:
Runtime 创建:
TaskRun(sample1)
sample2
Runtime:
TaskRun(sample2)
所以真正大量创建的是:
TaskRun
Process
其实 Go 也可以完全不需要:
一个 Node 一个 goroutine
type Channel struct { subscribers []Operator }
发布:
func Publish(ch *Channel,v any){ for _,op:=range ch.subscribers{ op.Notify(v) } }
Operator:
func (o *JoinOperator) Notify(v any){ o.cache.Add(v) if o.Ready(){ runtime.SubmitTask(...) } }
这里:
没有:
goroutine while(true)
全部都是:
事件回调
你的 Scheduler:
500ms ↓ 扫描数据库 ↓ 有没有 Ready?
即使没有任何变化,也会一直扫描:
Loop ↓ Loop ↓ Loop ↓ Loop
CPU 和数据库一直工作。
Nextflow:
只有:
Task 完成
才会:
emit()
notify()
submit()
sleep() poll() refresh()
CPU 几乎不空转。
你其实已经有一个很好的基础——之前你提到的 Event Bus 和 UDSListener。完全可以把现在的:
materialize() ↓ refresh() ↓ claim()
改成:
Executor ↓ NodeSuccessEvent ↓ EventBus ↓ DependencyManager ↓ 检查 downstream 是否全部满足 ↓ ReadyQueue ↓ WorkerPool
这样就不需要每 500ms 扫描数据库了。
如果是 Go,我甚至不会去模仿 GPars,而是会利用 Go 的优势设计成下面这种架构:
EventBus │ ┌─────────────────┴─────────────────┐ │ │ NodeFinishedEvent NodeFailedEvent │ ▼ DependencyManager │ ▼ ReadyQueue(chan NodeID) │ ▼ WorkerPool(固定数量 goroutine) │ ▼ Executor
这里的特点是:
DependencyManager
这也是现代工作流引擎(包括 Nextflow、Ray、Dask 等)普遍采用的事件驱动调度思想,而不是定时轮询。