Github开源生信云平台 DEMO
这是一个非常好的问题,也是你设计 BRAVE 工作流引擎 时最需要考虑的问题。
其实这两种调度器代表了两种完全不同的设计哲学:
下面分别解释。
例如你的系统:
AnalysisNode A Success B Pending C Running D Ready
Scheduler 不关心数据是什么,只关心:
状态有没有变化?
循环就是:
扫描数据库 ↓ A Success? ↓ B Ready ↓ Worker
所以它实际上是在回答:
哪些节点现在可以运行?
数据库天然保存:
Pending Ready Running Success Failed
重启后:
SELECT * ↓ 恢复运行
不用恢复内存中的 Channel。
这是 Nextflow 一直比较麻烦的问题。
例如 React 页面:
A ✅ ↓ B 🟢 ↓ C 🔴
因为数据库就是最新状态。
例如:
Stopping
Scheduler:
下一轮扫描 ↓ 停止
恢复:
重新扫描数据库 ↓ 继续
多个 Scheduler:
Scheduler1 Scheduler2 Scheduler3
都共享:
AnalysisNode
数据库就是唯一事实来源(Single Source of Truth)。
最大的缺点:
不知道什么时候状态改变。
只能:
500ms ↓ 扫描 ↓ 有没有 Ready?
所以:
Loop Loop Loop Loop
一直在空转。
Nextflow:
A ↓ Channel ↓ B
不是:
A Success?
而是:
A 输出了一个 token
sample1.fastq
进入:
channel
Runtime:
立即:
创建 B(sample1)
整个系统:
没有:
Pending Ready
这种状态转换。
poll() scan() refresh()
只有:
emit() ↓ notify() ↓ submit()
效率非常高。
samples ↓ sample1 sample2 sample3
自动:
Align(sample1) Align(sample2) Align(sample3)
根本不用 Scheduler 再扫描。
FASTQ ↓ Align ↓ VariantCalling
sample1 对齐完:
sample1.vcf
马上进入:
VariantCalling(sample1)
不用等:
sample2 sample3
全部结束。
因为:
Channel
一般都在内存。
程序挂了:
Channel 丢失
必须恢复:
Queue Operator Token
所以 Nextflow 的 Resume 其实做了很多工作。
答案:
完全可以。
而且:
数据库驱动比 Dataflow 更容易做动态 Scatter。
Topic ↓ 得到 Cluster1 Cluster2 Cluster3
运行:
Topic Success
输出:
cluster list
["A","B","C"]
然后:
materializeDynamicNodes()
读取:
ResolvedOutputs
动态创建:
Align(A) Align(B) Align(C)
数据库:
Node101 Node102 Node103
这就是:
Dynamic Scatter。
当然。
Sample ↓ Align ↓ 每个 Chromosome ↓ Variant
sample1
得到:
chr1 chr2 chr3
动态:
Variant(chr1) Variant(chr2) Variant(chr3)
chr1
里面:
又发现:
100 blocks
再生成:
Task1 Task2 Task3
Scatter ↓ Scatter ↓ Scatter
完全没问题。
其实你的代码已经有雏形:
bootstrapInputsFromUpstream()
它就是:
edge.SourceHandle ↓ edge.TargetHandle
A ↓ sample1.vcf
B ↓ sample1.bam
Join:
检查:
sample1.vcf 存在
并且:
sample1.bam 存在
于是:
Merge(sample1)
Pending ↓ Ready
即可。
Success
Key。
SampleID
Node sample1 Success
Node sample2 Success
查询:
sample1 两个 upstream 都 Success
Create Merge(sample1)
与你之前问的:
Cromwell 如何根据 SampleID Join
其实就是同一个思想。
我认为你的架构没有必要完全变成 Nextflow。
更好的方式是:
数据库负责持久化,事件总线负责调度。
Executor │ ▼ NodeSuccessEvent │ ┌──────┴──────┐ │ │ ▼ ▼ 更新数据库 DependencyManager │ │ └──────┬──────┘ ▼ ReadyQueue │ ▼ WorkerPool
数据库仍然保存:
Pending Running Success Failed
用于:
但是:
调度:
不用:
500ms ↓ 扫描数据库
NodeSuccessEvent ↓ 检查 downstream ↓ ReadyQueue ↓ Worker
我建议采用一种混合架构(Hybrid Scheduler):
具体来说:
这样你既保留了数据库驱动易恢复、易运维、易可视化的优势,又获得了接近 Nextflow 数据流驱动的调度效率。对于需要支持动态 Scatter、多级 Scatter、Join、失败恢复以及跨机器调度的生物信息工作流,这种混合模式通常比纯数据库轮询或纯内存 Dataflow 更均衡、更实用。