【问题标题】:Automatically adding a dataset to Dask scheduler on startup启动时自动将数据集添加到 Dask 调度程序
【发布时间】:2017-10-02 23:34:39
【问题描述】:

TL;DR
我想在启动时将数据集预加载到 Dask 分布式调度程序中。

背景
我正在使用 Dask 以实时查询方式使用更小的内存数据集。因为它是实时的,所以工作人员可以相信调度程序总是有某些可用的数据集——即使是在启动后立即可用,这一点很重要。工作人员始终将整个数据集保存在内存中。

传统上,我通过连接客户端、分散 df 并发布数据集来完成此操作:

df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)

但这留下了调度程序重新启动和数据集未加载的可能性。

我知道您可以使用--preload 在启动时执行脚本,如下所示:

dask-scheduler --preload=scheduler-startup.py

样板代码如下所示:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at", worker)

def dask_setup(scheduler):
    plugin = MyPlugin()
    scheduler.add_plugin(plugin)

但是如何说服调度程序在不使用外部客户端的情况下加载我的数据集?

理论上我可能会删除一个启动预填充客户端的子进程,但感觉不太理想:)

调度程序启动中的普通客户端
尝试在调度程序启动时作为客户端连接:

from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client

class MyPlugin(SchedulerPlugin):
    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at", worker)

def dask_setup(scheduler):
    c = Client(scheduler.address)
    df = dd.read_parquet('df.parq')
    df = c.persist(df)
    c.publish_dataset(flights=dfa)

挂在c = Client(scheduler.address) 并且必须被强制杀死 (kill -9)

【问题讨论】:

  • 如果您将提交的客户端代码放入启动脚本中会发生什么?
  • 它无限期挂起(可能在客户端尝试连接到尚未启动的调度程序的递归循环中)。挂起的进程不能被 Ctrl-C'ed 但必须被 kill -9'ed

标签: python dask dask-distributed


【解决方案1】:

您可以考虑将客户端代码添加到在事件循环上运行的异步函数中。这将允许预加载脚本完成,让调度程序启动,然后运行您的客户端代码。您可能需要以下内容:

async def f(scheduler):
    client =  await Client(scheduler.address)
    df = dd.read_parquet(...)
    await client.publish_dataset(flights=df)

def dask_setup(scheduler):
    scheduler.loop.add_callback(f, scheduler)

【讨论】:

    【解决方案2】:

    @MRocklin 的回答让我走上了正确的道路,但我确实需要转到另一个线程:

    from concurrent.futures import ThreadPoolExecutor
    
    def load_dataset():
        client = Client('127.0.0.1:8786')
        df = dd.read_parquet(...)
        df = client.persist(df)
        client.publish_dataset(flights=df)
    
    async def f(scheduler):
        executor = ThreadPoolExecutor(max_workers=1)
        executor.submit(load_dataset)
    
    def dask_setup(scheduler):
        scheduler.loop.add_callback(f, scheduler)
    

    缺点是它不会阻止工作人员在加载数据时进行连接,但我认为必须在工作人员方面进行管理(如果数据集不可用,请重试)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-02-11
      • 1970-01-01
      • 2023-03-07
      • 2023-03-21
      • 2023-04-02
      • 1970-01-01
      相关资源
      最近更新 更多