用golang实现类似nextflow的生信流程引擎,不考虑流程定义,用json定义流程,如何设计实现

最后发布时间 : 2026-06-28 22:41:44 浏览量 :
                JSON Workflow
                      │
                      ▼
              Workflow Loader
                      │
                      ▼
              DAG Builder
                      │
          检查循环、拓扑排序
                      │
                      ▼
             Scheduler Engine
                      │
      ┌───────────────┼────────────────┐
      ▼               ▼                ▼
 Local Executor   Docker Executor   K8s Executor
      │               │                │
      └───────────────┼────────────────┘
                      ▼
               Event Bus
                      ▼
              State Manager
                      ▼
           Database + WebSocket/SSE
Waiting
      │
依赖满足
      ▼
 Ready Queue
      │
 Worker Pool
      ▼
 Running
      │
 Finish
      ▼
 Finished

你的问题实际上问到了 Nextflow、Airflow、Argo、Temporal、Dask 等所有 DAG 引擎的核心:数据驱动(Dataflow)而不是流程驱动(Control Flow)

很多人第一次写工作流,都会写成:

for node in workflow:
    if dependency finished:
        run(node)

这是轮询式(Polling)

而 Nextflow 根本不是这样工作的。

它实际上更像:

事件驱动 + 数据流(Dataflow) + Actor Model

也就是说:

Task Finish
      │
      ▼
 Publish Event
      │
      ▼
 Scheduler
      │
      ▼
 Ready Queue
      │
      ▼
 Worker

整个系统没有一个 while(true) 去遍历 DAG。


一、先把 DAG 放到内存

例如你的 JSON

{
  "nodes":[
      {
          "id":"fastp"
      },
      {
          "id":"bwa",
          "depends":["fastp"]
      }
  ]
}

先变成

          fastp
             │
             ▼
            bwa

Graph

type Graph struct {
    Nodes map[string]*RuntimeNode
}

真正运行的是 RuntimeNode,而不是 JSON。

例如:

type RuntimeNode struct {

    ID string

    Depends []string

    Children []*RuntimeNode

    State State

    WaitCount int
}

这里 Children 很重要。

不要每次都去遍历 Graph。

建立:

fastp

Children

↓

bwa

这样 Task 完成以后可以直接通知孩子。


二、创建 Runtime DAG

例如:

fastp

depends=[]

↓

WaitCount=0
bwa

depends=[fastp]

↓

WaitCount=1

Scheduler 初始化:

fastp

WaitCount=0

↓

Ready Queue

bwa

WaitCount=1

不会进入 Ready Queue。

所以初始化以后:

Ready Queue

↓

fastp

三、Worker消费Ready Queue

例如:

func worker() {

    for node := range readyQueue {

        executor.Run(node)

    }

}

Worker 永远不知道 DAG。

它只知道:

Run Task

四、Executor运行

例如:

func Run(node *RuntimeNode){

    cmd := exec.Command(...)

    cmd.Run()

    EventBus.Publish(TaskFinished{
        Node:node,
    })

}

注意:

Executor 不会去调度别人。

它只负责:

运行

↓

完成

↓

发事件

五、EventBus

例如:

type TaskFinished struct{

    Node *RuntimeNode

}

发布:

bus.Publish(TaskFinished{
    Node:node,
})

而 Scheduler:

bus.Subscribe(TaskFinished{},scheduler.OnTaskFinished)

这就是观察者模式。

Executor 根本不知道 Scheduler 存在。


六、Scheduler收到事件

假设:

fastp

↓

finished

Scheduler:

func OnTaskFinished(event TaskFinished){

    node:=event.Node

    for _,child:=range node.Children{

        child.WaitCount--

        if child.WaitCount==0{

            readyQueue<-child

        }

    }

}

这就是整个 DAG 调度。

没有遍历。

只有:

Task Finish

↓

Children

↓

WaitCount--

↓

Ready

例如:

      A
    / | \
   B  C  D

初始化:

A

Wait=0
B

Wait=1
C

Wait=1
D

Wait=1

A 完成:

B.Wait--

↓

0

进入:

Ready Queue

同理:

C

Ready
D

Ready

Worker 自动开始。


七、真正的数据驱动

Nextflow 比这个高级。

例如:

fastp

↓

clean.fastq

不是:

Task Finish

而是:

Artifact Produced

例如:

fastp

↓

clean.fastq

ArtifactManager:

Publish(

ArtifactProduced{

    Name:"clean.fastq"

}

)

Scheduler:

Artifact Produced

↓

谁需要这个文件?

↓

bwa

↓

InputReady--

所以真正驱动的是:

数据

不是Task

八、RuntimeNode真正状态

建议:

type RuntimeNode struct{

    ID string

    WaitCount int

    Children []*RuntimeNode

    Inputs []*Artifact

    Outputs []*Artifact

    State State

}

Artifact

type Artifact struct{

    Name string

    Path string

    Ready bool

}

例如:

clean.fastq

Ready=true

bwa:

Inputs

↓

clean.fastq

Scheduler:

所有Input Ready

↓

Ready Queue

而不是:

Depends Finished

这是 Nextflow 的 Dataflow 思想。


九、整个生命周期

JSON

Workflow Loader

Runtime DAG

Scheduler

Ready Queue

Worker

Executor

Task Finished

EventBus

Scheduler

Children Wait--

Ready Queue

重复……

直到:

全部Finished

十、为什么这种设计扩展性极强

整个系统没有任何模块直接调用另一个模块:

Executor

不知道:

Scheduler

Scheduler 不知道:

Database

Database 不知道:

WebSocket

Monitor 不知道:

Executor

它们全部监听 EventBus:

             TaskFinished
                    │
      ┌─────────────┼─────────────┐
      ▼             ▼             ▼
 Scheduler      Database      Monitor
      │                           │
      ▼                           ▼
 ReadyQueue                    SSE Push

因此增加新的能力(如日志、统计、告警)只需增加新的事件监听器,而无需修改调度逻辑。


十一、更接近 Nextflow 的设计:Task 是模板,Execution 是实例

上面的模型还有一个重要缺陷:一个节点只能运行一次

但生信流程经常需要:

  • 一个样本对应一个 fastp
  • 100 个样本对应 100 个 fastp
  • bwa 对每个样本分别运行
  • 或者 Scatter 后产生 N 个并行任务

因此建议把**流程节点(Node)运行实例(TaskExecution)**彻底分离。

type Node struct {
    ID       string
    Command  string
    Depends  []string
}

type TaskExecution struct {
    ID         string            // 唯一运行实例
    NodeID     string            // 属于哪个节点
    Inputs     []*Artifact
    Outputs    []*Artifact
    State      State
    WorkDir    string
}

这样:

          Node(fastp)
               │
      ┌────────┴────────┐
      ▼                 ▼
 Task#1(sampleA)   Task#2(sampleB)

调度器调度的是 TaskExecution,而不是 Node。Node 更像模板(Template),TaskExecution 才是真正进入 Ready Queue 的对象。

这也是 Nextflow 能支持 Channel、Scatter/Gather、动态任务展开的关键原因。


一句话总结数据驱动的本质

传统 DAG 引擎是:

"前驱任务完成了,所以我可以运行后继任务。"`

而 Nextflow 风格的数据流引擎是:

"当输入数据(Artifact/Channel)到达时,就动态创建一个新的 TaskExecution;当该 TaskExecution 完成并产生新的数据,再驱动下游节点继续生成新的 TaskExecution。"

因此,**真正被调度的是运行实例(TaskExecution),真正驱动调度的是数据(Artifact 或 Channel),事件总线负责将"数据产生"和"任务完成"传播给调度器,而调度器只负责根据这些事件创建新的运行实例并放入 Ready Queue。**这就是数据驱动工作流引擎的核心运行原理。