【问题标题】:Packages that are imported are not recognized during parallel computing?并行计算时无法识别导入的包?
【发布时间】:2021-06-01 23:19:14
【问题描述】:

我正在与multiprocess.Pool 并行运行函数get_content。然后它抛出一个错误NameError: name 'session' is not defined。显然,我用session = requests.Session() 定义了它。能否请您详细说明一下这个问题?

import requests, os
from bs4 import BeautifulSoup
from multiprocess import Pool, freeze_support
core = os.cpu_count()
session = requests.Session() 
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
         'https://www.investopedia.com/terms/1/1-10net30.asp',
         'https://www.investopedia.com/terms/1/10-k.asp',
         'https://www.investopedia.com/terms/1/10k-wrap.asp',
         'https://www.investopedia.com/terms/1/10q.asp']

############ Get content of a word
def get_content(l):
    r = session.get(l, headers = headers)
    soup = BeautifulSoup(r.content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)

############ Parallel computing
if __name__=="__main__":
    P = Pool(processes = core)   
    content_list = P.map(get_content, links)
    content_all = ''.join(content_list)    
    freeze_support()

【问题讨论】:

  • 关于主进程中的对象如何与子进程共享的其他问题,请参见我的answer

标签: python python-3.x parallel-processing multiprocessing multiprocess


【解决方案1】:

首先,你的import语句不正确,应该是:

from multiprocessing import Pool, freeze_support

(你有from multiprocess ...,所以我完全不确定它是如何运行的)

使用正确的 import 语句,代码可以为我运行,但它并不像你想象的那样!我通过致电freeze_support 推测您正在Windows 下运行。在该平台下,通过调用系统函数spawn 调用新进程,这导致整个程序从最顶层执行。这就是为什么创建新进程的代码必须位于由if __name__ == '__main__': 管理的块中的原因。如果不是这样,那么您新创建的进程将在一个永无止境的递归循环中重新执行刚刚创建它们的代码,从而永远产生新的进程。

这意味着每个进程都在重新创建自己的 Session 实例,因为以下语句位于全局范围内:

session = requests.Session()

因此,能够为您尝试检索的多个 URL 重复使用相同的 Session 实例并没有真正的好处。为了重用相同的Session 实例,您必须使用会话对象初始化多处理池本身,以便它驻留在共享内存中并对所有进程可见。您还应该只在全局范围内保留最少的可执行代码:

import requests, os
from bs4 import BeautifulSoup
from multiprocessing import Pool, freeze_support

headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

def init_pool(s):
    global session
    session = s


############ Get content of a word
def get_content(l):
    r = session.get(l, headers = headers)
    soup = BeautifulSoup(r.content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)

############ Parallel computing
if __name__=="__main__":
    core = os.cpu_count()
    session = requests.Session()

    links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
             'https://www.investopedia.com/terms/1/1-10net30.asp',
             'https://www.investopedia.com/terms/1/10-k.asp',
             'https://www.investopedia.com/terms/1/10k-wrap.asp',
             'https://www.investopedia.com/terms/1/10q.asp']

    p = Pool(processes = core, initializer=init_pool, initargs=(session,))
    content_list = p.map(get_content, links)
    content_all = ''.join(content_list)
    print(content_all)
    freeze_support()

但实际上,您的代码大部分时间都在等待获取 URL,而在处理返回的 HTML 时只需要一点 CPU 时间。这可能是使用多线程而不是多处理的好选择。您需要对 原始 代码使用多线程的唯一更改是 (1) 删除对 freeze_support 的所有引用(除非您计划创建 exe 文件,否则您不需要多处理)并更改一个import 声明:

from multiprocessing.dummy import Pool

此外,在确定要使用的线程数时,您不应受到 CPU 内核数的限制(尽管您不想超过某个最大值):

import requests, os
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool
session = requests.Session()
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
         'https://www.investopedia.com/terms/1/1-10net30.asp',
         'https://www.investopedia.com/terms/1/10-k.asp',
         'https://www.investopedia.com/terms/1/10k-wrap.asp',
         'https://www.investopedia.com/terms/1/10q.asp']

############ Get content of a word
def get_content(l):
    r = session.get(l, headers = headers)
    soup = BeautifulSoup(r.content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)

############ Concurrent computing
if __name__=="__main__":
    # max of 25 is arbitrary; we do not want to appear to be a denial of service attack
    P = Pool(processes = min(len(links), 25))
    content_list = P.map(get_content, links)
    content_all = ''.join(content_list)
    print(content_all)

最后,您可以将线程池​​和多处理池结合起来,使用后者来处理处理的 CPU 密集型部分:

import requests, os
from bs4 import BeautifulSoup
from multiprocessing.pool import ThreadPool
from multiprocessing.pool import Pool
import os
from functools import partial


session = requests.Session()
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
         'https://www.investopedia.com/terms/1/1-10net30.asp',
         'https://www.investopedia.com/terms/1/10-k.asp',
         'https://www.investopedia.com/terms/1/10k-wrap.asp',
         'https://www.investopedia.com/terms/1/10q.asp']

############ Get content of a word
def get_content(process_pool, l):
    r = session.get(l, headers = headers)
    return process_pool.apply(process_content, args=(r.content,))

def process_content(content):
    soup = BeautifulSoup(content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)


############ Parallel computing
if __name__=="__main__":
    process_pool = Pool(processes = min(len(links), os.cpu_count()))
    thread_pool = ThreadPool(processes = min(len(links), 25))
    content_list = thread_pool.map(partial(get_content, process_pool), links)
    content_all = ''.join(content_list)
    print(content_all)

【讨论】:

  • 现在我感受到了并行计算的复杂性,也明白了为什么当今大多数游戏都无法利用芯片内的大量 CPU。非常感谢您的热心帮助!
  • 我的意思是.....先尝试 DOS,然后如果你被禁止一个小时就退出 ;)
  • 我添加了第三个编码示例,它结合了线程池和多处理池,只是为了表明这是可能的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-12-28
  • 2021-03-15
  • 2017-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-26
  • 1970-01-01
相关资源
最近更新 更多