数据库驱动(State-driven) VS 数据流驱动(Dataflow-driven)

最后发布时间 : 2026-07-04 12:02:45 浏览量 :

答案是:

**思想上非常类似,但实现方式并不一样。**Nextflow 没有一个显式的 materializeDynamicNodes()RefreshReadyStatus(),而是把这两步融合到了数据流(Dataflow)运行时里面。

可以从两者的运行模型来理解。


你的调度器:数据库状态驱动(State-based Scheduler)

你的 Scheduler 每 500ms 做一次轮询:

Loop

↓

materializeDynamicNodes()

↓

RefreshReadyStatus()

↓

ClaimReady()

↓

Worker 执行

例如 DAG

A
├──►B
└──►C
     │
     ▼
     D

第一次循环

A Pending

因为没有依赖:

A Ready

Worker:

A Running

结束:

A Success

第二次循环:

materializeDynamicNodes()

发现

A Success

于是:

创建 B
创建 C

然后:

RefreshReadyStatus()

检查:

B 所有 upstream success

于是:

Pending

↓

Ready

整个系统依赖:

数据库里的 Status。


Nextflow 完全不是这样

Nextflow 不维护:

Pending

Ready

Running

这些状态。

它维护的是:

Channel 是否有数据。

例如

process A {

    output:
    path "a.txt"

}

process B {

    input:
    path x

}

实际上运行时像这样:

A

↓

Channel

↓

B

A 结束:

a.txt

不是写数据库。

而是:

emit 到 channel

例如:

channel <- a.txt

这时:

B

自动收到:

a.txt

于是:

B Ready

注意:

这里没有:

RefreshReadyStatus()

因为:

收到数据,就意味着 Ready。


materializeDynamicNodes 对应 Nextflow 哪部分?

其实对应:

Process Instance 的创建。

例如:

Channel
    .of(1,2,3)

然后:

process Align {

input:
val sample

}

Nextflow 不会一次创建三个任务。

开始:

Align()

一个都没有。

Channel:

1

来了。

Runtime:

创建:

Align(sample=1)

第二个数据:

2

来了。

Runtime:

再创建:

Align(sample=2)

第三个:

3

来了。

Runtime:

Align(sample=3)

所以:

你的:

materializeDynamicNodes()

Nextflow:

收到输入后实例化 Process

只是:

你的依据:

Upstream Success

Nextflow:

依据:

Input Channel 有 Token

RefreshReadyStatus 对应什么?

Nextflow 没有:

Pending

↓

Ready

这一步。

因为:

Process 本身就在监听:

Input Channel

例如:

process B

需要:

x

y

两个输入。

Runtime:

一直等待:

Channel x

Channel y

当:

x

到了

还不能运行。

继续等:

y

来了。

Runtime:

立即:

submit task

所以:

RefreshReadyStatus()

其实被:

Dataflow Runtime

替代了。


举个 Join 的例子

例如:

A

↓

channel1
B

↓

channel2

然后:

channel1.join(channel2)

Nextflow Runtime:

维护:

Map

sample1

sample2

当:

channel1

sample1

来了。

先缓存。

channel2

sample1

来了。

立即:

join 成功

然后:

创建 downstream task

整个过程中:

没有:

RefreshReadyStatus()

因为:

join 本身就是 Ready 判断。


为什么你的 Scheduler 要分两步?

因为:

你没有:

Channel

只有:

数据库

数据库不知道:

Ready

所以:

必须:

Pending

↓

RefreshReadyStatus()

↓

Ready

这是:

Scheduler 算出来的。


Nextflow 为什么不用?

因为:

Dataflow Runtime 天生知道:

Input 是否齐了

例如:

Tuple

(sample1)

(sample2)

Runtime:

一直监听:

Channel

所以:

Ready

不是扫描出来的。

而是:

事件(Event)


两者最大的区别

你的 SchedulerNextflow
数据库驱动(State-driven)数据流驱动(Dataflow-driven)
Scheduler 周期扫描Channel 事件触发
materializeDynamicNodes 创建节点Channel 到达时实例化 Process Task
RefreshReadyStatus 判断 ReadyInput 满足自动 Ready
Claim Ready NodeExecutor 自动接收可运行 Task
AnalysisNode 是数据库实体TaskRun 是运行时对象

从设计模式来看

如果抽象到更高层,其实两者实现的是同一种思想:

只有当一个节点(Task)的所有依赖都满足时,才允许它进入执行阶段。

区别仅在于依赖满足的判定方式

  • 你的调度器:定时扫描数据库状态(Polling),根据上游节点 Success 来创建节点并将 Pending 转为 Ready
  • Nextflow:基于数据流事件(Event-driven),Channel 中的数据到达就是依赖满足的信号,运行时立即实例化并提交 Task,无需显式扫描。

如果以后你的 BRAVE 调度器希望进一步向 Nextflow 靠拢,那么可以考虑把当前的 Polling + 状态刷新 改造成 事件驱动(Event Bus)+ Ready Queue:节点完成后直接通知其下游检查是否满足条件,而不是每 500 ms 全量扫描。这会显著降低数据库访问频率,也更接近 Nextflow、Ray、Dask 等现代工作流引擎的调度方式。