【问题标题】:snakemake - how to make a list of input files based on a previous rule that produces variable number of filessnakemake - 如何根据生成可变数量文件的先前规则制作输入文件列表
【发布时间】:2020-04-23 14:57:01
【问题描述】:

说,我从一堆这样的文件开始:

group_1_in.txt, group_2_in.txt, group_3_in.txt

我使用生成如下所示目录结构的规则来处理它们。

rule process_group_files:
    input: 'group_{num}_in.txt'
    output: directory('group_{num}')
    shell: "some_command {input} {output}'

## directory structure produced: 
group_1
    sample1_content.txt
    sample2_content.txt
    sample3_content.txt
group_2
    sample2_content.txt
    sample3_content.txt
    sample4_content.txt
group_3
    sample1_content.txt
    sample2_content.txt
    sample5_content.txt 

然后,我有规则处理它们以按样本聚合文件:

rule aggregate_by_sample:
    input: expand('{group}/{sample}_content.txt')
    output: '{sample}_allcontent.txt'
    shell: "cat {input} | some_command > {output}"

我希望这条规则的输入是:

group_1/sample1_content.txt, group_3/sample1_content.txt
group_1/sample2_content.txt, group_2/sample2_content.txt, group_3/sample2_content.txt
group_1/sample3_content.txt, group_2/sample3_content.txt
group_2/sample4_content.txt 
group_3/sample5_content.txt

并生成以下输出文件:

sample1_allcontent.txt
sample2_allcontent.txt
sample3_allcontent.txt
sample4_allcontent.txt
sample5_allcontent.txt

此时,我想使用这些输出文件。所以,这个规则可以是这样的:

rule process_by_sample:
    input: <list of all sample_allcontent files>
    output: final_output.txt 
    shell: "cat {input} | some_other_command > {output}"

我的问题是:我如何告诉snakemake 等到它处理完aggregate_by_sample 规则中的所有文件,然后将那组输出文件用于规则process_by_sample 我通过将aggregate_by_sample 设为检查点来探索检查点的概念,但我应该使用“目录”作为输出,因为我不知道先验会生成多少个输出文件。但我不能这样做,因为我的输出文件名使用通配符,并且snakemake 抱怨Wildcards in input files cannot be determined from output files

编辑 -- 在看到@troy-comi 的回答后,我意识到我过度简化了我的问题。我更新了我的问题以包含第一条规则process_group_files。我在管道开始时只知道我有多少组以及“数字”通配符列表是什么。

【问题讨论】:

    标签: python snakemake


    【解决方案1】:

    由于文件已经存在,您可以使用 glob_wildcards 获取文件系统上的组/样本列表。使用它,您可以通过更多处理来构建您的输入文件。

    这是我的(未经测试的)想法:

    wc =  glob_wildcards('{group}/{sample}_content.txt')
    samples_to_group = {}
    for samp, group in zip(wc.group, wc.sample):
        if samp not in samples_to_group:
            samples_to_group[samp] = []
        samples_to_group.append(group)
    
    # now samples_to_group is a map of which groups are present for each sample
    
    rule all:
        input: "final_output.txt"
    
    rule aggregate_by_sample:
        input: expand('{group}/{sample}_content.txt', 
                      group=samples_to_group[wildcards.sample],
                      allow_missing=True)
        output: '{sample}_allcontent.txt'
        shell: "cat {input} | some_command > {output}"
    
    rule process_by_sample:
        input: expand('{sample}_allcontent.txt', sample=samples_to_group.keys())
        output: final_output.txt 
        shell: "cat {input} | some_other_command > {output}"
    

    如果另一个规则正在生成您必须使用检查点的文件。

    -- 编辑回答细化问题--

    如果你事先知道样本,我只能让它工作,不需要组样本映射,只是你总共有 5 个样本......

    使用以下文件设置目录:

    $ tail data/group_*.txt
    ==> data/group_1.txt <==
    1
    2
    3
    
    ==> data/group_2.txt <==
    2
    3
    4
    
    ==> data/group_3.txt <==
    1
    2
    5
    

    然后是一个 Snakefile

    wildcard_constraints:
        num="\d+"
    
    groups = glob_wildcards('data/group_{num}.txt').num
    samples = range(1, 6)
    
    rule all:
        input: "final_output.txt"
    
    checkpoint process_group_files:
        input: 'data/group_{num}.txt'
        output: directory('data/group_{num}')
        shell:
            'mkdir {output} \n'
            'for line in $(cat {input}) ; do echo "$line {input}" '
                '> {output}/${{line}}_content.txt ; '
            'done \n'
            'sleep 1'
    
    def aggregate_input(wildcards):
        for num in groups:
            checkpoints.process_group_files.get(num=num).output
    
        grps = glob_wildcards(f'data/group_{{group}}/{wildcards.sample}_content.txt').group
        return expand('data/group_{group}/{sample}_content.txt',
                group=grps,
                sample=wildcards.sample)
    
    
    rule aggregate_by_sample:
        input: aggregate_input
        output: 'data/agg/{sample}_allcontent.txt'
        shell: 'cat {input} > {output}'
    
    rule process_by_sample:
        input: expand('data/agg/{sample}_allcontent.txt', sample=samples)
        output: 'final_output.txt'
        shell: 'cat {input} > {output}'
    

    将给出最终输出:

    $ cat final_output.txt
    1 data/group_1.txt
    1 data/group_3.txt
    2 data/group_1.txt
    2 data/group_2.txt
    2 data/group_3.txt
    3 data/group_1.txt
    3 data/group_2.txt
    4 data/group_2.txt
    5 data/group_3.txt
    

    “魔术”是使用 for 循环调用检查点,这是您需要的锁定。同样,它需要事先了解样本。您可以尝试第二层检查点,但这通常会失败。我还记得其他人在 for 循环中遇到检查点问题,因此它可能会在非玩具示例中中断。顺便说一句,这是蛇形 5.10

    老实说,它最终可能会更容易分成两个工作流程 (snakemake -s Snakefile1 &amp;&amp; snakemake -s Snakefile2)!

    祝你好运!

    【讨论】:

    • 谢谢@troy-comi,您的解决方案运行良好,但恐怕我已经简化了很多管道。顶部显示的文件结构实际上是从看起来像 group1_content.txt, group2_content.txt, ... 的东西生成的,我在单独的规则中处理它。我现在将更新问题以澄清这一点。
    • 啊检查站!这是一个更困难的问题;我会研究一个解决方案,看看我能不能让它发挥作用。
    猜你喜欢
    • 1970-01-01
    • 2021-12-29
    • 2021-09-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-18
    相关资源
    最近更新 更多