【问题标题】:create very large queue for python multiprocessing为python多处理创建非常大的队列
【发布时间】:2019-04-09 03:16:31
【问题描述】:

我想创建一个包含大约 256K 文件路径的队列,并让这些路径出列并由并行工作进程处理。这是多处理而不是线程。

但是,当我创建一个 multiprocessing.queue 时,队列中似乎有 32K 个对象的硬限制。如果对象是文件的完整路径,这可能会更小。

为多处理创建多服务器队列的另一种方法是什么?

import multiprocessing
import sys

q = multiprocessing.Queue()

for i in range(32768 * 2):
    print i
    try:
        q.put('abcdef')
    except:
        print "Unexpected error on ()".format(i), sys.exc_info()[0]
        raise

产量:

...
32766
32767
Traceback (most recent call last):
Unexpected error on () <type 'exceptions.KeyboardInterrupt'>
  File "/Users/Wes/Dropbox/Programming/ElectionTransparency/vops_addons/dead/tryq.py", line 13, in <module>
    q.put('abc')
  File "/usr/local/Cellar/python@2/2.7.16/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 101, in put
    if not self._sem.acquire(block, timeout):
KeyboardInterrupt

【问题讨论】:

  • 这种队列的想法通常是同时放入和取出项目。如果您先启动工作程序而不是等待完全填充队列,您就不会遇到这个问题。
  • 或许可以考虑使用带有LPUSHBLPOP 的Redis 列表,然后您可以从网络中的任何机器插入、删除和处理项目。

标签: python multiprocessing


【解决方案1】:

您可以尝试使用 celery - http://www.celeryproject.org/ - 队列限制取决于代理配置。

此外,您不会被限制在同一台机器上的工作人员 - 任何可以挂载相同文件系统的计算机都可以运行 celery 工作人员来处理您的任务。 (虽然如果远程处理不是一个选项,使用 celery 工作者仍然比原始的多处理具有优势,因为有诸如自动重试之类的细节)

【讨论】:

    【解决方案2】:

    这是我最终发现的有效方法。我使所有工作进程都可以使用路径数组,并使用 multiprocessing.Value() 对象在受锁保护的数组中创建共享索引。

    from multiprocessing import Process, Lock, Value
    import os
    import sys
    import time
    
    def info(title, lock, item=None):
        pid = os.getpid()
        lock.acquire()
        print '<', title, item,' ', __name__, pid, '>'
        sys.stdout.flush()
        lock.release()
    
    def f(stdout_lock, next_item, worklist):
        while True:
            with next_item.get_lock():
                if len(worklist) <= next_item.value:
                    return
                item = worklist[next_item.value]
                next_item.value += 1
            info('queue item: ', stdout_lock, item)
            time.sleep(0.0001)
    
    if __name__ == '__main__':
        next_item = Value('l')
        worklist = [str(i) for i in range(250000)]
        next_item.value = 0
        stdout_lock = Lock()
        plist = []
        for i in range(3):
            plist.append(Process(target=f, args=(stdout_lock, next_item, worklist)))
            plist[-1].start()
        for i in range(3):
            plist[i].join()
    

    【讨论】:

      猜你喜欢
      • 2021-11-11
      • 2015-08-18
      • 1970-01-01
      • 2015-04-20
      • 1970-01-01
      • 2022-12-09
      • 2022-12-07
      • 2010-10-29
      • 2011-08-19
      相关资源
      最近更新 更多