【问题标题】:Python continuous parallel executionPython 连续并行执行
【发布时间】:2017-01-11 13:48:46
【问题描述】:

希望构建一个从标准输入运行无限读取循环的 python 脚本,例如for line in sys.stdin:。对于每次迭代,我想从使用line 作为输入在后台执行的池中获取一个工作者。完成执行或超时的进程打印到标准输出。

我很难找到能够连续工作的工作池模块。例如,multiprocess pool module 只支持像join 这样等待所有工作人员完成所有任务的功能。对于上述规范,我无法提前知道所有任务,并且需要将工作分配给后台进程。

【问题讨论】:

  • 您可以使用一个单独的进程,该进程从队列中消耗来自工作人员的结果(进程也是如此),并将其打印到标准输出。
  • 重申这个想法...每一行都添加到一个队列中。然后每个进程不断地检查队列中的一行。 (我是否需要锁定队列,以便多个进程不会从队列中删除同一行?)。然后如果有一行,进程会将其从队列中删除并将结果打印到标准输出,然后返回查看队列?如果工作时间过长而无法继续,我如何强制流程超时?你知道网上有什么例子吗?
  • 你有你的主循环产生 Process(..., args=(queue, line)) 每个新行到达。同时,先前生成的进程消耗队列并打印结果。 docs.python.org/3.6/library/multiprocessing.html
  • 从你的意思来看,每一行都会产生一个带有队列的新进程?如何重用流程,让每一行都不创建新流程?

标签: python multiprocessing stdin


【解决方案1】:

这将永远运行。

import sys
from multiprocessing import Pool

pool = Pool()

for line in sys.stdin.readline():
    pool.apply_async(function, args=[line])

def function(line):
    """Process the line in a separate process."""
    print(line)

【讨论】:

  • 嘿!我很确定这不起作用。你试过了吗?我用从 1 到 10 的循环尝试了类似的操作。 apply_async 实际上并没有开始执行任务。
【解决方案2】:

使用Poolimap 可能会更容易,但您必须假设工人的最大容量(processes=5):

import multiprocessing
import sys


def worker(line):
    return "Worker got %r" % (line)


pool = multiprocessing.Pool(processes=5)
for result in pool.imap(worker, sys.stdin):
    print "Result: %r" % (result)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-19
    • 1970-01-01
    • 2012-01-29
    • 2015-09-12
    • 1970-01-01
    • 2014-03-15
    • 1970-01-01
    相关资源
    最近更新 更多