nextflow 与cromwell与sankemake 分别如何 实现最开始输入一个文件, 中间有一个节点产生多个文件, 多个文件后续要scatter到多个节点的

最后发布时间 : 2026-06-22 00:39:00 浏览量 :

这是工作流引擎设计中的一个核心问题:动态 fan-out(scatter)和 fan-in(gather)

假设最开始只有一个输入文件:

sample.bam
    │
    ▼
Split
    │
    ├── chunk1.bam
    ├── chunk2.bam
    ├── chunk3.bam
    └── chunk4.bam
          │
          ▼
      Align
          │
          ▼
      Merge

其中:

  • Split 产生 N 个文件(N运行时决定)
  • Align 需要对每个文件独立执行
  • Merge 需要等待所有 Align 完成

三个工作流系统的实现方式差异非常大。


1 Nextflow

Nextflow 本质是 Dataflow Programming

其核心对象不是文件,而是:

Channel

每个 Channel 可以看成一个数据流。


Split

process Split {

    input:
    path bam

    output:
    path "*.bam"

    script:
    """
    split_bam.sh $bam
    """
}

输出:

chunk1.bam
chunk2.bam
chunk3.bam
chunk4.bam

自动 Scatter

workflow {

    chunks = Split(input_bam)

    Align(chunks)
}

这里:

chunks

实际上是:

Channel
 ├─ chunk1.bam
 ├─ chunk2.bam
 ├─ chunk3.bam
 └─ chunk4.bam

Nextflow 自动执行:

Align(chunk1)
Align(chunk2)
Align(chunk3)
Align(chunk4)

等价于:

for chunk in chunks:
    submit(Align(chunk))

但是用户不用写循环。


Gather

process Merge {

    input:
    path bam_files

    output:
    path "merged.bam"

}

使用:

workflow {

    chunks = Split(input)

    aligned = Align(chunks)

    Merge(aligned.collect())
}

这里:

collect()

把:

file1
file2
file3
file4

变成:

[file1,file2,file3,file4]

然后传给 Merge。


Nextflow DAG

运行时生成:

Split
  │
  ├─ Align(1)
  ├─ Align(2)
  ├─ Align(3)
  └─ Align(4)
          │
          ▼
        Merge

特点:

  • Scatter 数量运行时决定
  • DAG 运行时扩展
  • 动态 DAG

这是 Nextflow 最大优势。


2 Cromwell (WDL)

WDL 的思想:

Workflow + Task

Scatter 是语言级语法。


Split

task Split {

  input {
      File bam
  }

  command {
      split.sh ~{bam}
  }

  output {
      Array[File] chunks = glob("*.bam")
  }
}

返回:

Array[File]

例如:

[
 chunk1.bam,
 chunk2.bam,
 chunk3.bam,
 chunk4.bam
]

Scatter

scatter (chunk in split.chunks) {

    call Align {
        input:
            bam = chunk
    }
}

Cromwell 会展开成:

Align_0
Align_1
Align_2
Align_3

Gather

scatter 的输出天然是数组:

Array[File] aligned_files = Align.out

直接传给:

call Merge {
    input:
        files = aligned_files
}

Cromwell DAG

静态看:

Split
   │
scatter
   │
 ├─ Align
 ├─ Align
 ├─ Align
 └─ Align
   │
   ▼
 Merge

实际运行:

Split
  │
  ├─ Align_0
  ├─ Align_1
  ├─ Align_2
  └─ Align_3
        │
        ▼
      Merge

Cromwell 特点

优点:

  • scatter 是一等公民
  • 对 genomics 非常友好
  • Array 类型完整

缺点:

  • DAG 结构基本固定
  • 动态生成任务能力弱于 Nextflow

例如:

运行过程中再新增节点

Nextflow:

支持

Cromwell:

基本不支持

3 Snakemake

Snakemake 最接近 Makefile。

其 DAG 是:

Rule
 + 文件依赖

Split

rule split:
    input:
        "sample.bam"
    output:
        directory("chunks")
    shell:
        """
        split.sh {input}
        """

Scatter

需要提前知道文件名模式:

rule align:

    input:
        "chunks/{id}.bam"

    output:
        "aligned/{id}.bam"

    shell:
        """
        align.sh {input}
        """

动态发现文件

使用:

glob_wildcards()

例如:

IDS = glob_wildcards("chunks/{id}.bam").id

Gather

rule merge:

    input:
        expand(
            "aligned/{id}.bam",
            id=IDS
        )

    output:
        "merged.bam"

DAG

split
  │
  ├─ align(chunk1)
  ├─ align(chunk2)
  ├─ align(chunk3)
  └─ align(chunk4)
          │
          ▼
        merge

Snakemake Checkpoint

为了支持真正动态文件数:

checkpoint split:
checkpoint split:

    input:
        "sample.bam"

    output:
        directory("chunks")

然后:

def get_chunks(wildcards):

    ckpt = checkpoints.split.get()

    return expand(
        "aligned/{id}.bam",
        id=glob_wildcards(
            ckpt.output[0] + "/{id}.bam"
        ).id
    )

这样运行时:

执行 split
     ↓
发现 chunk 数量
     ↓
重新构建 DAG
     ↓
执行 align

三者对比

特性NextflowCromwell(WDL)Snakemake
ScatterChannel 自动scatter语法expand
Gathercollect()Array天然支持expand
动态文件数非常强一般checkpoint
DAG构建运行时提交前+运行时展开运行前
数据模型DataflowTyped ArrayFile Pattern
Fan-out最强
Fan-in
生信使用极多Broad/Terra生态学术界很多

如果映射到你正在做的 BRAVE Workflow Engine

根据你之前设计的:

Node
Edge
RuntimeEngine

最接近的实际上是 Nextflow 的 Dataflow 模型

建议把节点输出设计成:

OutputPort

输出:

File
List[File]
Channel[File]

例如:

SplitNode

返回:

[
  chunk1.bam,
  chunk2.bam,
  chunk3.bam
]

Runtime Engine 检测:

output is List

自动展开:

Align(chunk1)
Align(chunk2)
Align(chunk3)

运行时生成新的 Task Instance:

Align#1
Align#2
Align#3

而不是提前生成 DAG。

这种模式与 Nextflow 的 Channel + Process 机制最接近,也最适合你目前正在实现的动态分析流程和 Nextflow 集成场景。