【问题标题】:python input() blocks with multiple processes具有多个进程的 python input() 块
【发布时间】:2018-04-04 14:34:50
【问题描述】:

我正在尝试使用多处理来执行多个后台作业并将主进程用作用户界面,该界面通过input() 接受命令。每个进程必须完成某些工作,并将其当前状态写入字典,该字典由manager.dict() 创建,然后传递给进程。

创建进程后,有一个带有input() 的循环用于访问用户命令。为简单起见,此处将命令减少到最少。

from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
    producers = []
    settings = [{'name':'test'}]

    for setting in settings:
        status = manager.dict()
        logger.info("Start Producer {0}".format(setting['name']))
        producer = Process(target=start_producer, args=(setting, status))
        producer.start()
        producers.append([producer, status])

    logger.info("initialized {0} producers".format(len(producers)))

    while True:
        text_command = input('Enter your command:')
        if text_command == 'exit':
            logger.info("waiting for producers")
            for p in producers:
                p[0].join()
            logger.info("Exit application.")
            break
        elif text_command == 'status':
            for p in producers:
                if 'name' in p[1] and 'status' in p[1]:
                    print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
        else:
            print("Unknown command.")

在其他进程中运行的方法很简单:

def start_producer(producer_setting: dict, status_dict: dict):
    importer = MyProducer(producer_setting)
    importer.set_status_dict(status_dict)
    importer.run()

我创建了一个 MyProducer 实例并通过对象的 setter 设置状态字典并调用阻塞 run() 方法,该方法仅在生产者完成时返回。在调用set_status_dict(status_dict) 时,字典中会填充namestatus 元素。

当我运行代码时,生产者似乎已创建,我收到“开始生产者测试”和“初始化 1 个生产者”输出,然后收到来自 input() 的“输入您的命令”请求,但似乎实际过程没有运行。

当我按下回车键跳过第一次循环迭代时,我得到了预期的“未知命令”日志,并且生产者进程开始实际工作。之后,我的“状态”命令也按预期工作。

当我在第一次循环迭代中输入“状态”时,我得到一个键错误,因为字典中没有设置“名称”和“状态”。这些键应该在set_status_dict() 中设置,它本身在Process(target=...) 中调用。

这是为什么呢? producer.start() 不应该在新进程中运行start_producer 的完整块,因此永远不要挂在主进程的input() 上吗?

如何在没有任何用户输入的情况下首先启动进程,然后才等待input()

编辑:有这个问题的完整 mvce 程序可以在这里找到:https://pastebin.com/k8xvhLhn

编辑: 已找到初始化进程后带有sleep(1) 的解决方案。但为什么首先会发生这种行为呢?运行start_producer()中的所有代码不应该在新进程中运行吗?

【问题讨论】:

  • 你不能在进程之间共享 input()。
  • 我不想分享input()。其他进程应自行运行,无需与输入交互..
  • 我(完全)用status_dict['name'] = 'foo'; status_dict['status'] = 'thinking' 替换了start_producer 套件,其他一切似乎都按照你想要的方式工作。
  • 你怎么知道(你为什么认为)它在input()挂起
  • 当我按下回车键时,Unknown command. 在控制台中打印出来,然后其他进程作为例外运行。

标签: python python-multiprocessing


【解决方案1】:

我对多处理模块的经验有限,但我能够让它按照您想要的方式(我认为)运行。首先,我在 while 循环的顶部添加了一些打印语句以查看可能发生的情况,并发现如果该进程是 runjoined,它就可以工作。我想你不希望它阻塞,所以我添加了调用以进一步运行该过程 - 但似乎 run() 也阻塞了。事实证明,当第一次 while 循环迭代出现时,该过程还没有完成 - 在循环顶部添加 time.sleep(30) 使该过程有足够的时间来安排(由操作系统)并运行。 (在我的机器上它实际上只需要 200 到 300 毫秒的小睡时间)

我将start_producer 替换为:

def start_producer(producer_setting: dict, status_dict: dict):
##    importer = MyProducer(producer_setting)
##    importer.set_status_dict(status_dict)
##    importer.run()
    #time.sleep(30)
    status_dict['name'] = 'foo'
    status_dict['status'] = 'thinking'

您的代码已修改:

if __name__ == '__main__':
    with Manager() as manager:
        producers = []
        settings = [{'name':'test'}]

        for setting in settings:
            status = manager.dict()
            logger.info("Start Producer {0}".format(setting['name']))
            producer = Process(target=start_producer, args=(setting, status))
            producer.start()
            # add a call to run() but it blocks
            #producer.run()
            producers.append([producer, status])

        logger.info("initialized {0} producers".format(len(producers)))

        while True:
            time.sleep(30)
            for p, s in producers:
                #p.join()
                #p.run()
                print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
                if 'name' in s and 'status' in s:
                    print('{0}:{1}'.format(s['name'], s['status']))
            text_command = input('Enter your command:')
            if text_command == 'exit':
                logger.info("waiting for producers")
                for p in producers:
                    p[0].join()
                logger.info("Exit application.")
                break
            elif text_command == 'status':
                for p in producers:
                    if 'name' in p[1] and 'status' in p[1]:
                        print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
            else:
                print("Unknown command.")

【讨论】:

  • 那些生产者进程会运行很长时间。这就是我首先将它们置于流程中的原因。是的,run() 块,该方法不应由主进程调用,而应由新创建的进程调用。初始化进程后使用sleep(1) 的解决方案似乎工作正常,谢谢。但我很好奇为什么会这样。我真的认为整个块 start_producer() 已经在另一个进程中运行,不应该被阻塞。我不明白为什么会这样。
  • 我对整个练习的看法是它没有被阻塞,只是没有时间运行或完成。我真的不知道 manager.dict 是如何更新的,如果它是动态的并且在另一个进程正在运行时发生,或者它是否只在进程完成时才更新,即使它是第一个语句:我真的认为它发生在代码执行时进程中
  • @JackO'neill 这里有很多关于 SO 的多处理问题,它们的答案/cmets 暗示了开始时很多的开销另一个进程,我想开销包括操作系统调度您无法控制的任务。
  • 好的,这是有道理的。在继续主线程之前我可以稍等片刻,只是好奇为什么。谢谢你的帮助:)
  • @JackO'neill 如果你真的很好奇,你可以在主进程和子进程中添加一些 time.time() 语句(可能是status 中的另一个项),然后进行比较。跨度>
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-01-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-30
  • 2015-08-09
  • 1970-01-01
相关资源
最近更新 更多