【发布时间】:2017-12-15 05:01:20
【问题描述】:
我正在使用一个磁盘分布式长时间运行的任务,按照这个例子http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow 的行,一个长时间运行的工作任务从一个队列中获取它的输入,就像在 tensorflow 例子中一样,并将它的结果传递到一个输出队列。 (我在最新版本的 dask 中没有看到示例中使用的通道)。
我可以看到如何分散列表并应用映射来生成将输入数据推送到工作人员输入队列的期货列表。
def transfer_dask_to_worker(batch):
worker = get_worker()
worker.tensorflow_queue.put(batch)
data = [1,2,3,4]
future_data = e.scatter(data)
tasks = e.map(transfer_dask_to_worker, future_data ,
workers=dask_spec['worker'], pure=False)
现在,如果我们等待工作人员使用任务,所有结果将在工作人员的输出队列中。我们可以用
把它全部拉回来def transfer_worker_to_dask(arg):
worker = get_worker()
return worker.output_queue.get()
results = e.map(transfer_worker_to_dask,range(len(tasks)))
只要我们手动处理排序,等待所有工作任务完成,然后再调用它们,就可以正常工作。
我们如何将输出期货链接到输入的下游?有没有办法让长期运行的任务在可以收集回调度程序任务的工作人员上创建未来?
我尝试让 transfer_dask_to_worker(batch) 也查询输出队列并返回结果:
def transfer_dask_to_worker_and_return(batch):
worker = get_worker()
worker.tensorflow_queue.put(batch)
return worker.output_queue.get()
这适用于短名单,但由于取消了大约 1000 个项目的期货而开始失败。
提前致谢。
【问题讨论】:
标签: dask dask-distributed