【问题标题】:multiprocessing.Process doesn't terminate after putting requests response.content to queue将请求 response.content 放入队列后,multiprocessing.Process 不会终止
【发布时间】:2018-05-23 08:34:59
【问题描述】:

我正在尝试与 multiprocessing.Process 和请求并行运行多个 API 请求。我将要解析的 urls 放入 JoinableQueue 实例并将内容放回 Queue 实例。我注意到将 response.content 放入队列会以某种方式阻止进程终止。

这是一个只有 1 个进程的简化示例(Python 3.5):

import multiprocessing as mp
import queue
import requests
import time


class ChildProcess(mp.Process):
    def __init__(self, q, qout):
        super().__init__()
        self.qin = qin
        self.qout = qout
        self.daemon = True

    def run(self):
        while True:
            try:
                url = self.qin.get(block=False)
                r = requests.get(url, verify=False)
                self.qout.put(r.content)
                self.qin.task_done()
            except queue.Empty:
                break
            except requests.exceptions.RequestException as e:
                print(self.name, e)
                self.qin.task_done()
        print("Infinite loop terminates")


if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    time.sleep(1)
    print(w.name, w.is_alive())

运行代码后我得到:

无限循环终止

ChildProcess-1 真

请帮助理解为什么运行函数退出后进程没有终止。

更新:添加打印语句以显示循环终止

【问题讨论】:

  • 我遇到了完全相同的问题。我的 Process-inheriting 对象的“run”函数的最后一行执行了,但进程没有死。仍然没有找到答案(我的问题:stackoverflow.com/questions/57780333/…

标签: python python-3.x python-requests python-multiprocessing


【解决方案1】:

根据Queue documentation 很难弄清楚这一点 - 我也遇到了同样的问题。

这里的关键概念是,在生产者线程终止之前,它会加入任何有put 数据的队列;然后,该连接会阻塞,直到队列的后台线程终止,只有在队列为空时才会发生。所以基本上,在你的ChildProcess 可以退出之前,必须有人将它put 的所有东西都消耗到队列中!

有一些关于Queue.cancel_join_thread 函数的文档,它应该可以规避这个问题,但我无法让它产生任何效果——也许我没有正确使用它。

这是一个可以解决问题的示例修改:

if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    while True:
        try:
            qout.get(True, 0.1)     # Throw away remaining stuff in qout (or process it or whatever,
                                    # just get it out of the queue so the queue background process
                                    # can terminate, so your ChildProcess can terminate.
        except queue.Empty:
            break
    w.join()                # Wait for your ChildProcess to finish up.
    # time.sleep(1)         # Not necessary since we've joined the ChildProcess
    print(w.name, w.is_alive())

【讨论】:

    【解决方案2】:

    Pipes and Queues documentation中所述

    如果子进程已将项目放入队列中(并且尚未使用 JoinableQueue.cancel_join_thread),那么该进程将不会 直到所有缓冲的项目都被刷新到管道为止。

    这意味着如果您尝试加入该进程,您可能会遇到死锁 除非您确定所有已放入队列的项目 已被消耗。

    ...

    请注意,使用管理器创建的队列不存在此问题。

    如果切换到管理器队列,则进程成功终止:

    import multiprocessing as mp
    import queue
    import requests
    import time
    
    
    class ChildProcess(mp.Process):
        def __init__(self, q, qout):
            super().__init__()
            self.qin = qin
            self.qout = qout
            self.daemon = True
    
        def run(self):
            while True:
                try:
                    url = self.qin.get(block=False)
                    r = requests.get(url, verify=False)
                    self.qout.put(r.content)
                    self.qin.task_done()
                except queue.Empty:
                    break
                except requests.exceptions.RequestException as e:
                    print(self.name, e)
                    self.qin.task_done()
            print("Infinite loop terminates")
    
    
    if __name__ == '__main__':
        manager = mp.Manager()
        qin = mp.JoinableQueue()
        qout = manager.Queue()
        for _ in range(5):
            qin.put('http://en.wikipedia.org')
        w = ChildProcess(qin, qout)
        w.start()
        qin.join()
        time.sleep(1)
        print(w.name, w.is_alive())
    

    【讨论】:

      【解决方案3】:

      在打印消息上方添加对w.terminate() 的调用。


      关于为什么进程不会自行终止;你的函数代码是一个无限循环,所以它永远不会返回。调用 terminate 表示进程终止自身。

      【讨论】:

      • 谢谢,它有效。但是进程不自行终止的真正原因是什么?
      • 这不是进程不终止的正确原因 - 代码输出“无限循环终止”,表明它已退出循环(因为 break 语句)。跨度>
      猜你喜欢
      • 2023-04-04
      • 2016-05-19
      • 2018-08-12
      • 1970-01-01
      • 2014-02-10
      • 1970-01-01
      • 1970-01-01
      • 2015-11-10
      • 2011-05-18
      相关资源
      最近更新 更多