【问题标题】:Nextflow DSL2: how to combine outputs (channels) from multiple processes into input of another process by part of filename?Nextflow DSL2:如何通过文件名的一部分将来自多个进程的输出(通道)组合到另一个进程的输入中?
【发布时间】:2023-01-13 04:54:55
【问题描述】:

我正在尝试将两个独立进程 A 和 B 的输出合并到进程 C 的输入中,其中每个进程都输出多个文件。所有文件名都有一个共同的染色体编号(例如“chr1”)。进程 A 输出文件:/path/chr1_qc.vcf.gz/path/chr2_qc.vcf.gz 等(基因型文件)。

进程 B 输出文件:/path/chr1.a.bcf/path/chr1.b.bcf/path/chr1.c.bcf.../path/chr2.a.bcf/path/chr2.b.bcf 等(区域文件)。并且两个文件集的数量每次都可能不同。

部分代码:

process A {
  module "bcftools/1.16"
  publishDir "${params.out_dir}", mode: 'copy', overwrite: true
  input:
  path vcf
  path tbi

  output:
  path ("${(vcf =~ /chr\d{1,2}/)[0]}_qc.vcf.gz")
 
  script:
  """
  bcftools view -R ${params.sites_list} -Oz -o ${(vcf =~ /chr\d{1,2}/)[0]}_qc.vcf.gz ${vcf} //generates QC-ed genome files
  tabix -f ${(vcf =~ /chr\d{1,2}/)[0]}_qc.vcf.gz //indexing QC-ed genomes
  """
}

process B {
  publishDir "${params.out_dir}", mode: 'copy', overwrite: true
  input:
  path(vcf)

  output:
  tuple path("${(vcf =~ /chr\d{1,2}/)[0]}.*.bed")

  script:
  """
  python split_chr.py ${params.chr_lims} ${vcf} //generates region files
  """
}


process C {
  publishDir "${params.out_dir}", mode: 'copy', overwrite: true
  input:
  tuple path(vcf), path(bed)
  
  output:
  path "${bed.SimpleName}.vcf.gz"

  script:
  """
  bcftools view -R ${bed} -Oz -o ${bed.SimpleName}.vcf.gz ${vcf}
  """
}

workflow {
   A(someprocess.out)
   B(A.out)
   
   C(combined_AB_files)
}

进程B output.view()输出:

[/path/chr1.a.bed, /path/chr1.b.bed]
[/path/chr2.a.bed, /path/chr2.b.bed]

我如何才能让进程 C 接收输入作为元组通道(A 和 B 输出按染色体名称组合),如下所示:

[ /path/chr1_qc.vcf.gz, /path/chr1.a.bcf ]
[ /path/chr1_qc.vcf.gz, /path/chr1.b.bcf ]
...
[ /path/chr2_qc.vcf.gz, /path/chr2.a.bcf ]
...    

【问题讨论】:

    标签: nextflow


    【解决方案1】:

    这可以通过渠道运营商来完成。使用一些 cmet 检查下面的代码:

    workflow {
    
      // Let's start by building channels similar to the ones you described
      Channel
        .of(file('/path/chr1_qc.vcf.gz'), file('/path/chr2_qc.vcf.gz'))
        .set { pAoutput}
      Channel
        .of(file('/path/chr1.a.bcf'), file('/path/chr1.b.bcf'), file('/path/chr1.c.bcf'),
            file('/path/chr2.a.bcf'), file('/path/chr2.b.bcf'), file('/path/chr2.c.bcf'))
        .set { pBoutput }
    
      // Now, let's create keys to relate the elements in the two channels
      pAoutput
        .map { filepath -> [filepath.name.tokenize('_')[0], filepath ] }
        .set { pAoutput_tuple }
      // The channel now looks like this:
      // [chr1, /path/chr1_qc.vcf.gz]
      // [chr2, /path/chr2_qc.vcf.gz]
      pBoutput
        .map { filepath -> [filepath.name.tokenize('.')[0], filepath ] }
        .set { pBoutput_tuple }
      // And:
      // [chr1, /path/chr1.a.bcf]
      // [chr1, /path/chr1.b.bcf]
      // [chr1, /path/chr1.c.bcf]
      // [chr2, /path/chr2.a.bcf]
      // [chr2, /path/chr2.b.bcf]
      // [chr2, /path/chr2.c.bcf]
    
      // Combine the two channels and group by key
      pAoutput_tuple
        .mix(pBoutput_tuple)
        .groupTuple()
        .map { x, path_list ->
        [
          path_list.findAll { it.toString().contains('vcf.gz') }[0],
          path_list.findAll { it.toString().contains('.bcf') }
        ]
        }
        .flatMap { genotype_files, region_files -> region_files.collect { [genotype_files, it] } }
        .view()
    }
    

    您可以检查以下输出:

    N E X T F L O W  ~  version 22.10.4
    Launching `ex.nf` [maniac_pike] DSL2 - revision: f87873ef13
    [/path/chr1_qc.vcf.gz, /path/chr1.a.bcf]
    [/path/chr1_qc.vcf.gz, /path/chr1.b.bcf]
    [/path/chr1_qc.vcf.gz, /path/chr1.c.bcf]
    [/path/chr2_qc.vcf.gz, /path/chr2.a.bcf]
    [/path/chr2_qc.vcf.gz, /path/chr2.b.bcf]
    [/path/chr2_qc.vcf.gz, /path/chr2.c.bcf]
    

    【讨论】:

      猜你喜欢
      • 2021-10-13
      • 1970-01-01
      • 2023-01-26
      • 2018-12-03
      • 2016-09-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多