Github开源生信云平台 DEMO
这是工作流引擎设计中的一个核心问题:动态 fan-out(scatter)和 fan-in(gather)。
假设最开始只有一个输入文件:
sample.bam │ ▼ Split │ ├── chunk1.bam ├── chunk2.bam ├── chunk3.bam └── chunk4.bam │ ▼ Align │ ▼ Merge
其中:
三个工作流系统的实现方式差异非常大。
Nextflow 本质是 Dataflow Programming
其核心对象不是文件,而是:
Channel
每个 Channel 可以看成一个数据流。
process Split { input: path bam output: path "*.bam" script: """ split_bam.sh $bam """ }
输出:
chunk1.bam chunk2.bam chunk3.bam chunk4.bam
workflow { chunks = Split(input_bam) Align(chunks) }
这里:
chunks
实际上是:
Channel ├─ chunk1.bam ├─ chunk2.bam ├─ chunk3.bam └─ chunk4.bam
Nextflow 自动执行:
Align(chunk1) Align(chunk2) Align(chunk3) Align(chunk4)
等价于:
for chunk in chunks: submit(Align(chunk))
但是用户不用写循环。
process Merge { input: path bam_files output: path "merged.bam" }
使用:
workflow { chunks = Split(input) aligned = Align(chunks) Merge(aligned.collect()) }
collect()
把:
file1 file2 file3 file4
变成:
[file1,file2,file3,file4]
然后传给 Merge。
运行时生成:
Split │ ├─ Align(1) ├─ Align(2) ├─ Align(3) └─ Align(4) │ ▼ Merge
特点:
这是 Nextflow 最大优势。
WDL 的思想:
Workflow + Task
Scatter 是语言级语法。
task Split { input { File bam } command { split.sh ~{bam} } output { Array[File] chunks = glob("*.bam") } }
返回:
Array[File]
例如:
[ chunk1.bam, chunk2.bam, chunk3.bam, chunk4.bam ]
scatter (chunk in split.chunks) { call Align { input: bam = chunk } }
Cromwell 会展开成:
Align_0 Align_1 Align_2 Align_3
scatter 的输出天然是数组:
Array[File] aligned_files = Align.out
直接传给:
call Merge { input: files = aligned_files }
静态看:
Split │ scatter │ ├─ Align ├─ Align ├─ Align └─ Align │ ▼ Merge
实际运行:
Split │ ├─ Align_0 ├─ Align_1 ├─ Align_2 └─ Align_3 │ ▼ Merge
优点:
缺点:
运行过程中再新增节点
Nextflow:
支持
Cromwell:
基本不支持
Snakemake 最接近 Makefile。
其 DAG 是:
Rule + 文件依赖
rule split: input: "sample.bam" output: directory("chunks") shell: """ split.sh {input} """
需要提前知道文件名模式:
rule align: input: "chunks/{id}.bam" output: "aligned/{id}.bam" shell: """ align.sh {input} """
glob_wildcards()
IDS = glob_wildcards("chunks/{id}.bam").id
rule merge: input: expand( "aligned/{id}.bam", id=IDS ) output: "merged.bam"
split │ ├─ align(chunk1) ├─ align(chunk2) ├─ align(chunk3) └─ align(chunk4) │ ▼ merge
为了支持真正动态文件数:
checkpoint split:
checkpoint split: input: "sample.bam" output: directory("chunks")
然后:
def get_chunks(wildcards): ckpt = checkpoints.split.get() return expand( "aligned/{id}.bam", id=glob_wildcards( ckpt.output[0] + "/{id}.bam" ).id )
这样运行时:
执行 split ↓ 发现 chunk 数量 ↓ 重新构建 DAG ↓ 执行 align
根据你之前设计的:
Node Edge RuntimeEngine
最接近的实际上是 Nextflow 的 Dataflow 模型。
建议把节点输出设计成:
OutputPort
File List[File] Channel[File]
SplitNode
[ chunk1.bam, chunk2.bam, chunk3.bam ]
Runtime Engine 检测:
output is List
自动展开:
Align(chunk1) Align(chunk2) Align(chunk3)
运行时生成新的 Task Instance:
Align#1 Align#2 Align#3
而不是提前生成 DAG。
这种模式与 Nextflow 的 Channel + Process 机制最接近,也最适合你目前正在实现的动态分析流程和 Nextflow 集成场景。