【问题标题】:Multiprocessing using a queue and worker pool, letting workers extend the queue使用队列和工作池进行多处理,让工作人员扩展队列
【发布时间】:2017-03-08 13:42:14
【问题描述】:

我正在尝试了解 Python 中的多处理,但目前正在努力解决以下问题:

从工作人员池开始,我想将生成器函数中的对象提供给队列,然后由工作人员使用。这很好用,但是我现在想扩展我的程序以允许工作人员将工作添加到队列中。然而,这是我遇到问题的部分,因为我在第一个循环中添加的工作紧跟在第二个循环中添加的停止代码(参见示例代码)。这意味着任何工人添加的任何额外工作将永远不会被执行......

我认为唯一需要的方法是检查队列是否为空以及所有工作人员都没有做任何事情,然后继续执行最后一个停止工作人员的 for 循环。但是我不知道如何检查工人的状态来做到这一点。

显示示例的最少代码:

import multiprocessing, time, random

def f(queue):
    worker_name = multiprocessing.current_process().name
    print "Started: {}".format(worker_name)

    while True:
        value = queue.get()
        if value is None:
            break

        print "{} is processing '{}'".format(worker_name, value)
        # compute(value)
        time.sleep(1)

        # Worker may add additional work to queue
        if random.random() > 0.7:
            queue.put("Extra work!")

    print "Stopping: {}".format(worker_name)


n_workers = 4
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(n_workers, f, (queue,))

# Feed large objects from generator
for i in xrange(20):
    queue.put(i)

# All extra work is skipped

# Terminate workers after finishing work
for __ in xrange(n_workers):
    queue.put(None)

pool.close()
pool.join()

print "Finished!"
print queue.get() # Will yield 'Extra Work!' should be empty

【问题讨论】:

    标签: python python-multiprocessing


    【解决方案1】:

    使用计数信号量值,我设法实现了我想要的。我通过增加/减少这个值来跟踪每个工作人员的活动,并在以下情况下立即停止工作人员:队列为空并且工作人员不再处理任何事情。

    感谢任何建议。

    示例代码:

    import multiprocessing, time, random
    
    def f(queue, semaphore):
        worker_name = multiprocessing.current_process().name
        print "Started: {}".format(worker_name)
    
        while True:
            value = queue.get()
            if value is None:
                break
    
            with semaphore.get_lock():
                semaphore.value -= 1
    
            print "{} is processing '{}'".format(worker_name, value)
            # compute(value)
            time.sleep(1)
    
            # Worker may add additional work to queue
            if random.random() > 0.7:
                queue.put("Extra work!")
    
            with semaphore.get_lock():
                semaphore.value += 1
    
        print "Stopping: {}".format(worker_name)
    
    
    n_workers = 4
    semaphore = multiprocessing.Value('i', n_workers)
    queue = multiprocessing.Queue()
    pool = multiprocessing.Pool(n_workers, f, (queue, semaphore))
    
    # Feed large objects from generator
    for i in xrange(20):
        queue.put(i)
    
    while not queue.empty() or semaphore.value != n_workers:
        time.sleep(0.2)
    
    # Terminate workers after finishing work
    for __ in xrange(n_workers):
        queue.put(None)
    
    pool.close()
    pool.join()
    
    print "Finished!"
    print queue.empty() # True
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-02-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-02-22
      • 2016-04-18
      相关资源
      最近更新 更多