【问题标题】:Manipulate Nextflow variables outside of scripts在脚本之外操作 Nextflow 变量
【发布时间】:2019-10-03 21:13:19
【问题描述】:

我有一个进程 iterate_list。进程 iterate_list 获取一个列表并对列表中的每个项目执行一些操作。运行脚本时,它需要两个输入。它需要处理的列表和项目(它作为消费者从 rabbitmq 队列中获取)

目前,我为整个列表提供了一个 python 脚本,它遍历每个执行处理(作为一个大块)并在完成后返回。这很好,但是,如果系统重新启动,它会重新启动。

我想知道,我怎样才能使每次我的 python 脚本处理一个项目时,它会返回该项目,我将其从列表中删除,然后将新列表传递给进程。因此,在系统重新启动/崩溃的情况下,nextflow 知道它在哪里停止并可以从那里继续。

import groovy.json.JsonSlurper

def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = [] 

items = Channel.from(analysis_config.items.keySet())

for (String item : items) {
    list_of_items_to_process << item
    } 

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    """ 
}

process signal_completion{

    echo true

    input:
    val typing_cur

    script:
    """
    echo "all done!"
    """
}

基本上,进程“iterate_list”从消息代理的队列中获取一个“项目”。进程 iterate_list 应该类似于:

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    list_of_items_to_process.remove(<output from python script>)
    """
}

因此,对于每一个,它都会运行,删除它刚刚处理的项目,然后重新启动一个新列表。

initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.

【问题讨论】:

    标签: python groovy workflow nextflow


    【解决方案1】:

    看起来您真正想做的是在 Nextflow 进程中操作全局 ArrayList。 AFAIK,没有办法完全做到这一点。这就是channels 的用途。

    尚不清楚您是否真的需要从要处理的项目列表中删除任何项目。 Nextflow 已经可以使用 -resume 选项使用缓存的结果。那么为什么不只传递完整列表和单个项目进行处理呢?

    items = Channel.from(['foo', 'bar', 'baz'])
    
    items.into {
        items_ch1
        items_ch2
    }
    
    process iterate_list{
    
        input:
        val item from items_ch1
        val list_of_items_to_process from items_ch2.collect()
    
        """
        python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
        """
    }
    

    我只能猜测您的 Python 脚本如何使用其参数,但如果您要处理的项目列表只是一个占位符,那么您甚至可以输入要处理的项目的单个元素列表:

    items = Channel.from(['foo', 'bar', 'baz'])
    
    process iterate_list{
    
        input:
        val item from items
    
        """
        python3.7 process_list_items.py "${item}" '[${item}]'
        """
    }
    

    【讨论】:

    • 由于 nextflow 不允许使用 Google Cloud Filestore,我有点滥用工作流程,让它处理我的 rabbitmq RPC 而不是真正自己完成工作。但这种情况正在改变,现在我最大的问题是存储。如何使用来自云存储桶的输入。然后我如何处理文件并将其上传回输出存储桶。我明天会问这个问题。 Nextflow 有一个小型社区。​​span>
    • @daudnadeem:基本上只需使用gs:// 协议指定它们,例如gs://yourbucket/path_to_file。然后确保指定存储工作流中间结果的 GS 存储桶。您可以使用-work-dir 选项执行此操作,例如nextflow run main.nf -work-dir gs://yourbucket/project/workdir。您可能还想使用可能称为 --outdir 的自定义参数来指定一个 GS 存储桶,以使用 publishDir 指令发布您的输出。然后将 --outdir gs://yourbucket/project/results 添加到您的 cmd 行。
    • 好的。假设我有一个文件 gs://this-bucket/file.txt,并且我将 work-dir 指定为 gs://this-bucket,如果我想访问该文件,我是否将其指向为 gs://this- bucket/file.txt 还是 Nextflow 会将此文件下载到本地存储中,我只是将其称为 file.txt?
    • @daudnadeem:实际上我认为您需要根据提示here 指定一个存储桶子目录作为您的工作目录。但是您应该能够将您的文件指向为gs://this-bucket/file.txt。然后,例如,在您的脚本中包含行 params.foobar = "gs://this-bucket/file.txt"foobar = file(params.foobar),然后您应该能够将其包含在您的任何进程中,即 process baz { input: file(foobar) ... }
    • @daudnadeem:“任何未存储在 Google Storage 存储桶中的输入数据都将自动传输到管道工作存储桶”——来自here
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-11
    • 1970-01-01
    • 1970-01-01
    • 2014-08-19
    • 1970-01-01
    • 2017-08-27
    相关资源
    最近更新 更多