【问题标题】:With dask-distributed how to generate futures from long running tasks fed by queues使用 dask-distributed 如何从队列提供的长时间运行的任务中生成期货
【发布时间】:2017-12-15 05:01:20
【问题描述】:

我正在使用一个磁盘分布式长时间运行的任务,按照这个例子http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow 的行,一个长时间运行的工作任务从一个队列中获取它的输入,就像在 tensorflow 例子中一样,并将它的结果传递到一个输出队列。 (我在最新版本的 dask 中没有看到示例中使用的通道)。

我可以看到如何分散列表并应用映射来生成将输入数据推送到工作人员输入队列的期货列表。

def transfer_dask_to_worker(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)

data = [1,2,3,4] 

future_data = e.scatter(data)

tasks = e.map(transfer_dask_to_worker, future_data ,
     workers=dask_spec['worker'], pure=False)

现在,如果我们等待工作人员使用任务,所有结果将在工作人员的输出队列中。我们可以用

把它全部拉回来
def transfer_worker_to_dask(arg):
    worker = get_worker()
    return worker.output_queue.get()

results = e.map(transfer_worker_to_dask,range(len(tasks)))

只要我们手动处理排序,等待所有工作任务完成,然后再调用它们,就可以正常工作。

我们如何将输出期货链接到输入的下游?有没有办法让长期运行的任务在可以收集回调度程序任务的工作人员上创建未来?

我尝试让 transfer_dask_to_worker(batch) 也查询输出队列并返回结果:

def transfer_dask_to_worker_and_return(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)
    return worker.output_queue.get()

这适用于短名单,但由于取消了大约 1000 个项目的期货而开始失败。

提前致谢。

【问题讨论】:

    标签: dask dask-distributed


    【解决方案1】:

    注意:那篇博文是实验性的。这里有几种方法,我不会局限于那种模式

    让我们从这个具体的问题开始:

    我们如何将输出期货链接到输入的下游?有没有办法让长时间运行的任务在可以收集回调度程序任务的工作人员上创建未来?

    这里最简单的解决办法大概是把本地数据分散,然后放到一个Dask distributed Queue中。因此,如果您的 TensorFlow 代码在产生某些结果时调用了一个函数,那么该函数可能会将本地数据分散到未来(这实际上并没有移动数据,它只是让 Dask 工作人员开始跟踪它)然后将其放入未来进入分布式队列。将未来放入队列中可以让 Dask 中的其他客户端和工作人员知道数据的存在,并在必要时将其拉下

    from dask.distributed import Queue
    results_q = Queue()
    
    def tf_result_ready(result):
        future = get_worker().scatter(result)
        results_q.put(future)
    

    然后,您可以坐在您的客户端代码中,并在结果可用时从该队列中提取结果:

    for _ in range(n_blocks):
        future = results_q.get()
        # do stuff with future like submit or gather
    

    【讨论】:

    • 这个解决方案绝对有效。谢谢您的帮助。然而,让 TensorFlow (v1.4) 在 Dask-Distributed worker 中表现良好仍然是一个挑战。在调用 free 期间,我经常但不会重复地在调用 TF Adam Optimizer 的工作任务上生成段错误(sig11)。相同的图表和优化器在本地运行良好。
    • 您可能想验证 TensorFlow 在多线程环境中是否正常工作。如果没有,那么您可能需要更改您的 dask 工作人员,使其拥有更多进程,每个进程都有一个线程。
    • 这是一个很好的建议。 TF 应该是处理线程,但有一些参考资料表明这在 python 中是多么困难。我正在使用 dask-worker --nthreads 1 启动 dask 分布式工作人员,但我怀疑您还有其他想法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-11
    • 1970-01-01
    • 2015-11-29
    • 1970-01-01
    相关资源
    最近更新 更多