【问题标题】:Collect incremental results from Tornado's ProcessPoolExecutor从 Tornado 的 ProcessPoolExecutor 收集增量结果
【发布时间】:2017-02-21 16:51:59
【问题描述】:

我有一个tornado 应用程序,它需要在ProcessPoolExecutor 上运行阻塞功能。这个阻塞函数使用了一个库,它通过blinker 事件发出增量结果。我想收集这些事件并在它们发生时将它们发送回我的tornado 应用程序。

起初,tornado 似乎非常适合这个用例,因为它是异步的。我想我可以简单地将tornado.queues.Queue 对象传递给要在池上运行的函数,然后将put() 事件作为blinker 事件回调的一部分到此队列中。

但是,阅读 tornado.queues.Queue 的文档后,我了解到它们不是像 multiprocessing.Queue 这样跨进程管理的,也不是线程安全的。

有没有办法在它们发生时从pool 检索这些事件?我应该包装multiprocessing.Queue 使其产生Futures 吗?这似乎不太可能奏效,因为我怀疑multiprocessing 的内部结构与tornado 兼容。

[编辑] 这里有一些不错的线索:https://gist.github.com/hoffrocket/8050711

【问题讨论】:

    标签: python multiprocessing tornado blinker


    【解决方案1】:

    要收集传递给ProcessPoolExecutor 的任务的返回值以外的任何内容,您必须使用multiprocessing.Queue(或multiprocessing 库中的其他对象)。然后,由于multiprocessing.Queue 只公开了一个同步接口,因此您必须使用父进程中的另一个线程从队列中读取(无需深入了解实现细节。这里可以使用一个文件描述符,但我们将忽略它现在因为它没有记录并且可能会更改)。

    这是一个未经测试的快速示例:

    queue = multiprocessing.Queue()
    proc_pool = concurrent.futures.ProcessPoolExecutor()
    thread_pool = concurrent.futures.ThreadPoolExecutor()
    
    async def read_events():
        while True:
            event = await thread_pool.submit(queue.get)
            print(event)
    
    async def foo():
        IOLoop.current.spawn_callback(read_events)
        await proc_pool.submit(do_something_and_write_to_queue)
    

    【讨论】:

    • 是的,这或多或少是我最终做的,但使用了支持非阻塞 AioQueue.coro_getaioprocessing 模块。
    【解决方案2】:

    您可以做的比这更简单。这是一个向子进程提交四个慢速函数调用并等待它们的协程:

    from concurrent.futures import ProcessPoolExecutor
    from time import sleep
    
    from tornado import gen, ioloop
    
    pool = ProcessPoolExecutor()
    
    
    def calculate_slowly(x):
        sleep(x)
        return x
    
    
    async def parallel_tasks():
        # Create futures in a randomized order.
        futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
                   for i in [1, 3, 2, 4]]
    
        wait_iterator = gen.WaitIterator(*futures)
        while not wait_iterator.done():
            try:
                result = await wait_iterator.next()
            except Exception as e:
                print("Error {} from {}".format(e, wait_iterator.current_future))
            else:
                print("Result {} received from future number {}".format(
                    result, wait_iterator.current_index))
    
    
    ioloop.IOLoop.current().run_sync(parallel_tasks)
    

    它输出:

    Result 1 received from future number 0
    Result 2 received from future number 2
    Result 3 received from future number 1
    Result 4 received from future number 3
    

    您可以看到协程按照它们完成的顺序接收结果,而不是它们提交的顺序:future number 1 在future number 2 之后解析,因为future number 1 睡眠时间更长。 convert_yielded 将 ProcessPoolExecutor 返回的 Futures 转换为 Tornado 兼容的 Futures,可以在协程中等待。

    每个 future 解析为 calculate_slowly 返回的值:在这种情况下,它与传入 calculate_slowly 的数字相同,并且与 calculate_slowly 休眠的秒数相同。

    要将其包含在 RequestHandler 中,请尝试以下操作:

    class MainHandler(web.RequestHandler):
        async def get(self):
            self.write("Starting....\n")
            self.flush()
    
            futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
                       for i in [1, 3, 2, 4]]
    
            wait_iterator = gen.WaitIterator(*futures)
            while not wait_iterator.done():
                result = await wait_iterator.next()
                self.write("Result {} received from future number {}\n".format(
                    result, wait_iterator.current_index))
    
                self.flush()
    
    
    if __name__ == "__main__":
        application = web.Application([
            (r"/", MainHandler),
        ])
        application.listen(8888)
        ioloop.IOLoop.instance().start()
    

    如果您curl localhost:8888,您可以观察到服务器对客户端请求的响应是递增的。

    【讨论】:

    • 这不是我的意思。为了澄清,我有一个慢速函数调用。而且,该函数调用通过blinker 发出许多事件。我想收集这些事件并将它们发送回主龙卷风线程。我相信您这里的代码并行运行四个慢速函数,而不是一个慢速函数。
    猜你喜欢
    • 1970-01-01
    • 2012-06-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多