【问题标题】:Use a multiprocessing.Queue in luigi在 luigi 中使用 multiprocessing.Queue
【发布时间】:2018-08-18 01:37:55
【问题描述】:

我在 luigi 中有一组任务,它们都需要访问数据库。我最多可以有 8 个任务同时访问我的数据库,前提是它们位于不同的端口上(我有允许的端口列表)。 我应该如何最好地实现这个似乎类似于工人数量的标准限制的限制,即对于我的情况,任务应该在工人空闲并且数据库端口空闲时运行。

我尝试在__main__ 中创建multiprocessing.Queue() 并将其传递给WrapperTask,后者将其作为luigi.Parameter() 接收,但这会产生错误并挂起

UserWarning: Parameter "queue" with value <multiprocessing.queues.Queue object at 0x00000000149E4518>" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))

这个想法是,如果队列为空,.get() 调用将挂起一个任务,然后再次继续另一个任务 .put(port)

这里出了什么问题?还是我在 luigi 中管理资源的方法完全错误?

【问题讨论】:

    标签: python resources queue multiprocessing luigi


    【解决方案1】:

    您应该使用 Luigi 配置中的“资源”部分。这将确保不超过这个数量的工人共享一个全球资源。在这里找到更多https://luigi.readthedocs.io/en/stable/configuration.html#resources

    【讨论】:

      猜你喜欢
      • 2021-06-26
      • 1970-01-01
      • 1970-01-01
      • 2017-06-12
      • 1970-01-01
      • 2021-09-27
      • 1970-01-01
      • 2018-03-09
      • 1970-01-01
      相关资源
      最近更新 更多