【问题标题】:How to return values from Process- or Thread instances?如何从 Process- 或 Thread 实例返回值?
【发布时间】:2018-11-21 13:38:59
【问题描述】:

所以我想运行一个函数,它既可以在网上搜索信息,也可以直接从我自己的 mysql 数据库中搜索信息。 第一个过程会很耗时,第二个相对较快。

考虑到这一点,我创建了一个启动此复合搜索 (find_compound_view) 的进程。如果该过程相对较快地完成,则意味着它存在于数据库中,因此我可以立即呈现结果。否则,我将渲染“drax_retrieving_data.html”。

我想出的愚蠢解决方案是运行该函数两次,一次是检查该过程是否需要很长时间,另一次是实际获取函数的返回值。这很大程度上是因为我不知道如何返回我的 find_compound_view 函数的值。我试过谷歌搜索,但似乎找不到如何从 Process 类中返回值。

   p = Process(target=find_compound_view, args=(form,))
        p.start()
        is_running = p.is_alive()
        start_time=time.time()
        while is_running:
            time.sleep(0.05)
            is_running = p.is_alive()
            if time.time() - start_time > 10 :
                print('Timer exceeded, DRAX is retrieving info!',time.time() - start_time)
                return render(request,'drax_internal_dbs/drax_retrieving_data.html')
        compound = find_compound_view(form,use_email=False)

   if compound:
      data=*****
      return  render(request, 'drax_internal_dbs/result.html',data)

【问题讨论】:

    标签: python multithreading multiprocessing python-multiprocessing python-multithreading


    【解决方案1】:

    您需要multiprocessing.Pipemultiprocessing.Queue 将结果发送回您的父进程。如果你只是做 I/0,你应该使用Thread 而不是Process,因为它更轻量级并且大部分时间都花在等待上。我正在向您展示它是如何为进程和线程完成的。


    使用队列处理

    多处理队列建立在管道之上,访问与锁/信号量同步。队列是线程和进程安全的,这意味着您可以将一个队列用于多个生产者/消费者进程,甚至这些进程中的多个线程。在队列中添加第一项也将在调用过程中启动一个馈线线程。 multiprocessing.Queue 的额外开销使得在单生产者/单消费者场景中使用管道更可取且性能更高。

    以下是使用multiprocessing.Queue 发送和检索结果的方法:

    from multiprocessing import Process, Queue
    
    SENTINEL = 'SENTINEL'
    
    def sim_busy(out_queue, x):
        for _ in range(int(x)):
            assert 1 == 1
        result = x
        out_queue.put(result)
        # If all results are enqueued, send a sentinel-value to let the parent know
        # no more results will come.
        out_queue.put(SENTINEL)
    
    
    if __name__ == '__main__':
    
        out_queue = Queue()
    
        p = Process(target=sim_busy, args=(out_queue, 150e6))  # 150e6 == 150000000.0
        p.start()
    
        for result in iter(out_queue.get, SENTINEL):  # sentinel breaks the loop
            print(result)
    

    队列作为参数传递给函数,结果是队列中的.put() 和队列中的父get.()s。 .get() 是一个阻塞调用,直到要获取 的东西(指定超时参数是可能的)才会恢复执行。请注意 sim_busy 在这里所做的工作是 cpu 密集型的,此时您会选择进程而不是线程。


    工艺和管道

    对于一对一的连接,管道就足够了。设置几乎相同,只是方法名称不同,对Pipe() 的调用会返回两个连接对象。在双工模式下,两个对象都是读写端,duplex=False(单工)第一个连接对象是管道的读端,第二个是写端。在这个基本场景中,我们只需要一个单工管道:

    from multiprocessing import Process, Pipe
    
    SENTINEL = 'SENTINEL'
    
    
    def sim_busy(write_conn, x):
        for _ in range(int(x)):
            assert 1 == 1
        result = x
        write_conn.send(result)
        # If all results are send, send a sentinel-value to let the parent know
        # no more results will come.
        write_conn.send(SENTINEL)
    
    
    if __name__ == '__main__':
    
        # duplex=False because we just need one-way communication in this case.
        read_conn, write_conn = Pipe(duplex=False)
    
        p = Process(target=sim_busy, args=(write_conn, 150e6))  # 150e6 == 150000000.0
        p.start()
    
        for result in iter(read_conn.recv, SENTINEL):  # sentinel breaks the loop
            print(result)
    

    线程和队列

    要与线程一起使用,您想切换到queue.Queuequeue.Queue 构建在 collections.deque 之上,添加了一些锁以使其成为线程安全的。与多处理的队列和管道不同,放在queue.Queue 上的对象不会被腌制。由于线程共享相同的内存地址空间,内存复制的序列化是不必要的,只传输指针。

    from threading import Thread
    from queue import Queue
    import time
    
    SENTINEL = 'SENTINEL'
    
    
    def sim_io(out_queue, query):
        time.sleep(1)
        result = query + '_result'
        out_queue.put(result)
        # If all results are enqueued, send a sentinel-value to let the parent know
        # no more results will come.
        out_queue.put(SENTINEL)
    
    
    if __name__ == '__main__':
    
        out_queue = Queue()
    
        p = Thread(target=sim_io, args=(out_queue, 'my_query'))
        p.start()
    
        for result in iter(out_queue.get, SENTINEL):  # sentinel-value breaks the loop
            print(result)
    

    • 阅读here为什么for result in iter(out_queue.get, SENTINEL): 在可能的情况下,应该优先于 while True...break 设置。
    • 阅读 here 为什么应该在所有脚本中使用 if __name__ == '__main__':,尤其是在多处理中。
    • 更多关于get()-usage here

    【讨论】:

    • 谢谢,这不是我一直在寻找的解决方案,但它引导我走向正确的方向!
    • 如果我不使用队列会怎样——然后呢?
    • @VaidøtasI。对于多线程,您还可以简单地将结果附加到当前的Thread-object,例如def foo(): threading.current_thread().result = 42
    • @Darkonaut 谢谢,但是 ThreadPool 和 ThreadPoolExecutor 类可以毫无问题地处理所有这些,因为无论如何内存都是在线程之间共享的。我在谈论更多关于多处理问题。但我看到 Pipe 足够优雅,如果我使用它,就足够了 :)。
    猜你喜欢
    • 1970-01-01
    • 2016-08-30
    • 1970-01-01
    • 1970-01-01
    • 2018-01-10
    • 1970-01-01
    • 2021-02-20
    • 2017-01-23
    • 1970-01-01
    相关资源
    最近更新 更多