【问题标题】:Requests and multiprocessing请求和多处理
【发布时间】:2017-10-12 13:19:16
【问题描述】:

所以我试图同时在多个网站上使用requestsBeautifulSoup,但由于某种原因我无法使其工作。 这是一个完整的例子:

import multiprocessing as mp
import requests
from bs4 import BeautifulSoup
from random import randint

# Define an output queue


class Spider(object):
    """docstring for Spider"""
    def __init__(self):
        super(Spider, self).__init__()

    # define a example function
    def rand_string(length, output):
        random_post=randint(1000000,9999999)
        response=requests.get('https://stackoverflow.com/questions/'+str(random_post))
        soup=BeautifulSoup(response.content,'lxml')
        try:
            title=soup.find('a',{'class':'question-hyperlink'}).string
        except:
            title="not found"

        output.put(title)

    # Setup a list of processes that we want to run
    def run(self):

        output = mp.Queue()
        processes = [mp.Process(target=Spider.rand_string, args=(x, output)) for x in range(10)]

        for p in processes:
            p.start()

        # Exit the completed processes

        for p in processes:
            p.join()

        # Get process results from the output queue

        results = [output.get() for p in processes]
        print(results)

# Run processes

if __name__ == '__main__':

    spider=Spider()
    spider.run()

【问题讨论】:

  • 您是否尝试过在单线程时确保一切顺利运行?多处理可能会使调试子进程变得困难
  • 您发布的代码中没有任何明显的地方,所以您的问题在其他地方..
  • @Aaron,单线程工作得很好,但我对执行时间不满意。没有一个函数使用相同的资源/文件
  • 好吧,atm 我看不出你发布的代码有什么问题。所有功能是否足够相似,您可以浓缩为一个功能并提供另一个参数(搜索网站)?如果是这样,我个人会尝试这样做并使用已经构建的multiprocessing.pool
  • @Aaron 我认为这与我在 Windows 机器上运行的事实有关,但我还没有弄清楚如何更改代码...stackoverflow.com/questions/37244168/…跨度>

标签: python beautifulsoup queue multiprocessing


【解决方案1】:

我添加了一堆调试打印语句来遵循您的流程并得出一些结论...

  1. 您有时可能会遇到使用 bs4 的递归深度限制...
  2. 您之前链接的答案(在 cmets 中)确实与您的问题相关。
  3. 没有fork() 的窗口是一个巨大的痛苦。

您的主要错误是在rand_string() 中的行:

title=soup.find('a',{'class':'question-hyperlink'}).string

这是返回 <class 'bs4.element.NavigableString'> 而不是 <class str>。当它被传递给mp.Queue.put() 时,尝试腌制它以便可以通过内部管道发送失败并出现递归错误,从而使队列停止。我不确定是否真的可以通过腌制管道发送 bs4 元素,(也许如果你将引用循环转换为弱引用?)但总是发送简单的 python 对象要容易得多。我还将队列的创建移到了主上下文(spider.run() 之外),尽管这并不是特别必要的,只要它仅由主线程执行即可。这是我在最后一次迭代中的调试代码,因此您可以按照我的测试方法进行:

from multiprocessing import Process, Queue, current_process
import requests
from bs4 import BeautifulSoup
from random import randint
import sys
#sys.setrecursionlimit(1000)

class Spider(object):
    """docstring for Spider"""

    # define a example function
    @staticmethod
    def rand_string(length, output):

        print("{} entry point".format(current_process().name))
        random_post=randint(1000000,9999999)
        response=requests.get('https://stackoverflow.com/questions/'+str(random_post))
        print("{} got request response".format(current_process().name))
        soup=BeautifulSoup(response.content,'lxml')
        try:
            title = soup.find('a',{'class':'question-hyperlink'}).string
        except:
            title = "not found"

        print("{} got title: '{}' of type: {}".format(current_process().name, title, type(title)))

        ###### This did it ######
        title = str(title) #fix or fake news?

        output.put([title,current_process().name])
        output.close()
        print("{} exit point".format(current_process().name))


    # Setup a list of processes that we want to run
#    @staticmethod
    def run(self, outq):
        processes = []
        for x in range(5):
                processes.append(Process(target=self.rand_string, name="process_{}".format(x), args=(x, outq,),) )
                print("creating process_{}".format(x))

        for p in processes:
            p.start()
            print("{} started".format(p.name))

        # Exit the completed processes
        for p in processes:
            p.join()
            print("successuflly joined {}".format(p.name))

        # Get process results from the output queue
        print("joined all workers")
#        return None
        out = []
        while not outq.empty():
            result = outq.get()
            print("got {}".format(result))
            out.append(result)
        return out

# Run processes
if __name__ == '__main__':
    outq = Queue()
    spider=Spider()
    out = spider.run(outq)
    print("done")

以及运行上述代码的输出:

创建进程_0
创建进程_1
创建进程_2
创建进程_3
创建进程_4
process_0 已启动
process_1 已启动
process_2 已启动
process_3 已启动
process_4 已启动
process_2 入口点
process_2 得到请求响应
process_2 的标题:“未找到”类型:&ltclass 'str'&gt
process_2 出口点
process_0 入口点
process_0 得到请求响应
process_0 的标题:“视频播放完毕后启动活动”类型:&ltclass 'bs4.element.NavigableString'&gt
process_0 退出点
成功加入process_0
process_3 入口点
process_3 得到请求响应
process_3 得到标题:'只是不明白这些 typedef 的意义'类型:&ltclass 'bs4.element.NavigableString'&gt
process_3 出口点
process_1 入口点
process_1 得到请求响应
process_1 的标题:“magento 管理产品网格中的导入按钮 + 文件浏览字段”类型:&ltclass 'bs4.element.NavigableString'&gt
process_1 出口点
process_4 入口点
process_4 得到请求响应
process_4 的标题:“我如何使用 subselect 进行查询”类型:&ltclass 'bs4.element.NavigableString'&gt
process_4 退出点
成功加入process_1
成功加入process_2
成功加入process_3
成功加入process_4
加入所有工人
得到['未找到','process_2']
got ['视频播放完毕后启动活动', 'process_0']
got ["只是不明白这些 typedef 的意义", 'process_3']
得到 ['magento 管理产品网格中的导入按钮 + 文件浏览字段','process_1']
got ['如何使用 subselect 进行查询', 'process_4']
完成

【讨论】:

  • 他们说英雄不存在!!非常感谢,我尝试了 1231441 件事情,我什至记得现在阅读有关腌制队列对象的内容,但我不认为 bs4.tag 是问题所在。另外,感谢你重写了这件事,它帮助我学习如何编写更具可读性的代码。为你加油,谢谢!
  • 不客气@Mike。我现在可能要打破你的泡沫了,不过如果提到这个应用程序,Threads 会好得多。您的特定应用程序与 http 请求是 i/o 绑定的,因此进程与线程的额外开销可能不值得额外的处理能力。此外,您可以完全避免队列问题,因为线程共享内存,您可以简单地使用几个锁(mutex) 写入共享资源,以防止意外覆盖..
  • 还请记住,来自子进程或线程的打印语句可能会被缓冲并稍后实际写入。如果您希望立即打印(有点),您必须调用sys.stdout.write("stuff"),然后是@ 987654331@ 在我的示例中,每个进程在终止之前从未将其输出刷新到标准输出,这就是为什么每组 4 个打印语句被组合在一起的原因。
  • 我确实设法使用了线程,但它根本没有改善我的运行时间。我的目标是让 4 个进程进一步分成多个线程。您的解决方案有效,但在我的真实脚本上,我仍然很难通过该 queue.queue 对字典列表进行排队。伟大的帮助和感谢打印建议!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-10-30
  • 1970-01-01
  • 2021-08-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-11-22
相关资源
最近更新 更多