【问题标题】:snakemake workflow broken by a rule that produces multiple files生成多个文件的规则破坏了 snakemake 工作流程
【发布时间】:2020-08-14 16:12:37
【问题描述】:

考虑以下 snakemake 工作流程(在此 gist 中完成):

我有一组预定义的参数来定义我的工作流程:

PAR={
  "id_list": range(1,10),
}

我需要暂存数据,这里通过创建带有随机数的文件来模拟:

rule stage:
  output: "in/{id}.old"
  shell: "echo $RANDOM > {output}"

我有一个收集所有暂存文件名的函数和一个聚合暂存步骤的随附规则:

def get_all_dat(wildcards):
  out=[]
  for i in PAR["id_list"]:
    dat=rules.stage.output[0].format(id=i)
    out.append(dat)
  return out

rule stage_all:
  input: get_all_dat
  output: "in/staged.list"
  shell: "for i in {input}; do echo $i; done > {output}"

我绝对不需要 get_all_dat 函数来做像这个例子这样简单的事情(expand 的输入 stage_all 会这样做),但我决定将它包含在这里,因为它符合我的实际工作流程,其中有几个通配符,它​​们都需要排队,这个函数可以确保。

然后是处理步骤:

rule process:
  input: 
    list="in/staged.list",
    util="process.sh"
  output: "out/{id}.new",
  shell: "./{input.util} $(cat {input.list})"

它获取来自stage_all 规则的文件列表并将内容传递给process.sh 脚本。该脚本实质上对in/{id}.old 进行了一些虚拟更改并写入out/{id}.new,具体代码请参阅gist

至关重要的是,此过程会读取 all in/{id}.old 文件并创建 all out/{id}.new 文件。正是在这里,工作流通道被破坏了。与get_all_dat 函数一样,这个“处理”只是一个例子;我的实际工作流程中的实际处理不能分成单独的{id} 通道。

下一步是“绘图”:

rule plot:
  input:  "out/{id}.new"
  output: "out/{id}.plot"
  shell: "echo \"plot of $(cat {input})\" > {output}"

...它拥有自己的聚合器(就像登台步骤一样):

def get_all_plot(wildcards):
  out=[]
  for i in PAR["id_list"]:
    dat=rules.plot.output[0].format(id=i)
    out.append(dat)
  return out

rule plot_all:
  input: get_all_plot
  output: "out/plotted.list"
  shell: "for i in {input}; do echo $i; done > {output}"

规则process的主要问​​题是每个out/{id}.new文件都会发起对process.sh的新调用,同时读取所有in/{id}.old文件,同时写入所有out/{id}.new,这样不好。我在process.sh 中添加了一些代码来计算该脚本被调用的次数,请参阅gist

我尝试过的事情:

  1. 使用 bash 和 lock 文件以及flock,强制额外的调用等待幸运的第一个process.sh 线程完成,然后继续没有错误;
  2. 在规则processoutput:中使用directory("out")
  3. 添加将out/{id}.new 连接到directory("out") 的附加规则:
    rule connector:
      input: "out",
      output: "out/{id}.new",

后果:

  1. 竞争条件丰富,确实没有什么好的方法可以确保只执行一个process.sh 并且snakemake 删除out/{id}.new 文件(应该如此),因为当相应的{id} process 规则时找不到它们被第一次调用;
  2. 工作流中断,因为 out/{id}.new 没有连接到 directory("out")
  3. ChildIOException: File/directory is a child to another output:

我的意图是以out/plotted.list 作为目标运行完整的工作流程,具有任意数量的核心(所有这些都需要等待一个process.sh 线程完成)。原因是process 步骤很便宜,而plot 步骤很昂贵,{id} 可以有很多很多值。

感谢您在长篇博文中对我的包容。

【问题讨论】:

    标签: snakemake


    【解决方案1】:

    这个呢:

    rule process:
        input: 
            list="in/staged.list",
            util="process.sh",
        output: 
            touch('out/staged.list'),
        shell: 
            "./{input.util} $(cat {input.list})"
    
    rule plot:
        input:
            list= 'out/staged.list ',
        output: 
            "out/{id}.plot"
        params:
            id= lambda wc: "out/%s.new" % wc.id,
        shell: 
            r"""
            echo "plot of $(cat {params.id})" > {output}
            """
    

    规则process 在输出中提供一个虚拟文件,因此它只执行一次,但它会生成所有必要的文件。

    规则plot 从上面输入虚拟文件,因此它的真实输入也必须存在。真正的输入文件作为参数传递。

    下一个规则应该可以正常使用"out/{id}.plot"

    【讨论】:

    • 太好了,谢谢@dariober。我今天学到的一件事:您可以通过 params 将文件从 DAG 中删除。
    猜你喜欢
    • 2021-02-03
    • 2019-03-05
    • 2012-03-02
    • 2014-08-12
    • 2017-04-22
    • 1970-01-01
    • 1970-01-01
    • 2017-04-12
    • 2022-11-11
    相关资源
    最近更新 更多