【发布时间】:2017-10-02 23:34:39
【问题描述】:
TL;DR
我想在启动时将数据集预加载到 Dask 分布式调度程序中。
背景
我正在使用 Dask 以实时查询方式使用更小的内存数据集。因为它是实时的,所以工作人员可以相信调度程序总是有某些可用的数据集——即使是在启动后立即可用,这一点很重要。工作人员始终将整个数据集保存在内存中。
传统上,我通过连接客户端、分散 df 并发布数据集来完成此操作:
df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)
但这留下了调度程序重新启动和数据集未加载的可能性。
我知道您可以使用--preload 在启动时执行脚本,如下所示:
dask-scheduler --preload=scheduler-startup.py
样板代码如下所示:
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)
def dask_setup(scheduler):
plugin = MyPlugin()
scheduler.add_plugin(plugin)
但是如何说服调度程序在不使用外部客户端的情况下加载我的数据集?
理论上我可能会删除一个启动预填充客户端的子进程,但感觉不太理想:)
调度程序启动中的普通客户端
尝试在调度程序启动时作为客户端连接:
from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client
class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)
def dask_setup(scheduler):
c = Client(scheduler.address)
df = dd.read_parquet('df.parq')
df = c.persist(df)
c.publish_dataset(flights=dfa)
挂在c = Client(scheduler.address) 并且必须被强制杀死 (kill -9)
【问题讨论】:
-
如果您将提交的客户端代码放入启动脚本中会发生什么?
-
它无限期挂起(可能在客户端尝试连接到尚未启动的调度程序的递归循环中)。挂起的进程不能被 Ctrl-C'ed 但必须被 kill -9'ed
标签: python dask dask-distributed