【问题标题】:Downloading meta content for around 15000 URL Python - threading下载大约 15000 个 URL Python 的元内容 - 线程
【发布时间】:2021-09-22 09:02:06
【问题描述】:

我的 csv 中有大约 30000 个网址。我需要检查每个 url 是否存在元内容。我正在使用 request_cache 基本上将响应缓存到 sqlite db。即使在使用缓存系统后也需要大约 24 小时。因此我转向并发。我想我对out = executor.map(download_site, sites, headers) 做错了。也不知道怎么解决。

AttributeError: 'str' 对象没有属性 'items'

import concurrent.futures
import requests
import threading
import time
import pandas as pd
import requests_cache
from PIL import Image
from io import BytesIO

thread_local = threading.local()

df = pd.read_csv("test.csv")

sites = []
for row in df['URLS']:
    sites.append(row)

# print("URL is shortened")

user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
headers={'User-Agent':user_agent,}

requests_cache.install_cache('network_call', backend='sqlite', expire_after=2592000)


def getSess():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

def networkCall(url, headers):
    print("In Download site")
    session = getSess()
    with session.get(url, headers=headers) as response:
        print(f"Read {len(response.content)} from {url}")
        return response.content

out = []
def getMeta(meta_res):
    print("Get data")
    for each in meta_res:
        meta = each.find_all('meta')
        for tag in meta:
            if 'name' in tag.attrs.keys() and tag.attrs['name'].strip().lower() in ['description', 'keywords']:
                content = tag.attrs['content']
                if content != '':
                    out.append("Absent")
                else:
                    out.append("Present")
    return out


def allSites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        out = executor.map(networkCall, sites, headers)
        return list(out)


if __name__ == "__main__":
    sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
    ] * 15000
    start_time = time.time()
    list_meta = allSites(sites)
    print("META   ", list_meta)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")
    output = getMeta(list_meta)
    df["is it there"] = pd.Series(output)
    df.to_csv('new.csv',index=False, header=True)

【问题讨论】:

  • 此代码无法运行,因为它缺少几个函数。您还需要记住,即使您的代码尽可能高效,您也可能会受到各种 URL 响应 HTTP GET 所需时间的限制
  • 你试过使用 asyncio 吗?我在加快数百页的查询速度方面取得了一些成功。
  • @DarkKnight 它是可运行的,只需评论 df[test.csv] 部分。我在 if name == "main":. 下给出了站点变量
  • @MariaZentsova 啊,这是我最后的手段
  • 不可运行,因为 download_sitesget_session 都丢失了

标签: python python-3.x multithreading asynchronous


【解决方案1】:

我已尝试模仿您的功能。以下代码在 4 分钟内执行:-

from bs4 import BeautifulSoup as BS
import concurrent.futures
import time
import queue
import requests


URLs = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice"
] * 15_000

user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
headers = {'User-Agent': user_agent}


class SessionCache():
    def __init__(self, cachesize=20):
        self.cachesize = cachesize
        self.sessions = 0
        self.q = queue.Queue()

    def getSession(self):
        try:
            return self.q.get(block=False)
        except queue.Empty:
            pass
        if self.sessions < self.cachesize:
            self.q.put(requests.Session())
            self.sessions += 1
        return self.q.get()

    def putSession(self, session):
        self.q.put(session)


CACHE = SessionCache()


def doGet(url):
    try:
        session = CACHE.getSession()
        response = session.get(url, headers=headers)
        response.raise_for_status()
        soup = BS(response.text, 'lxml')
        for meta in soup.find_all('meta'):
            if (name := meta.attrs.get('name', None)):
                if name.strip().lower() in ['description', 'keywords']:
                    if meta.attrs.get('content', '') != '':
                        return url, 'Present'
        return url, 'Absent'
    except Exception as e:
        return url, str(e)
    finally:
        CACHE.putSession(session)


def main():
    start = time.perf_counter()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for r in executor.map(doGet, URLs):
            print(f'{r[0]} -> {r[1]}')
    end = time.perf_counter()
    print(f'Duration={end-start:.4f}s')


if __name__ == '__main__':
    main()

【讨论】:

  • 这是一个 sqlite3.OperationalError: database is locked
  • 我不知道为什么会这样。与 requests_cache 的作者一起讨论这个问题
  • 我已经编辑了我的答案以删除对 requests_cache 的依赖
【解决方案2】:

最近偶然发现了一个用于发出异步请求的库,它被称为requests-futures

语法很简单:

from concurrent.futures import as_completed
from requests_futures.sessions import FuturesSession
from bs4 import BeautifulSoup
import time

URLs = [

    'https://www.jython.org',
     'http://olympus.realpython.org/dice'
] * 1000



def scrape(URLs):

    with FuturesSession() as session:
        out = []
        futures = [session.get(url) for url in URLs]
        for future in as_completed(futures):
            resp = future.result()
            pageSoup = BeautifulSoup(resp.content, 'html.parser')
            meta = pageSoup.find_all("meta")
            for tag in meta:
                if 'name' in tag.attrs.keys() and tag.attrs['name'].strip().lower() in ['description', 'keywords']:
                    content = tag.attrs['content']
                    if content != '':
                        out.append("Absent")
                    else:
                        out.append("Present")
        return out

因此,抓取 2,000 个 URL 基本上只用了不到一分钟的时间

%%time
data = scrape(URLs)

CPU 时间:用户 13.5 秒,系统:801 毫秒,总计:14.3 秒 挂起时间:30.1 秒

【讨论】:

    【解决方案3】:

    AttributeError: 'str' 对象没有属性 'items'

    requests.models.PrepareRequest.prepare_headers() 中发生此错误。当您调用executor.map(networkCall, sites, headers) 时,它会将headers 转换为列表,因此您最终会得到request.headers = 'User-Agent' 而不是request.headers = {'User-Agent': '...'}

    由于看起来标题实际上并没有改变,您可以将其设为常量并将其作为参数从 networkCall() 中删除:

    HEADERS = {'User-Agent':user_agent}
    ...
    
    def networkCall(url):
        session = getSess()
        with session.get(url, headers=HEADERS) as response:
            print(f"Read {len(response.content)} from {url}")
            return response.content
    ...
    
    def allSites(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
            out = executor.map(networkCall, sites)
            return list(out)
    

    sqlite3.OperationalError: 数据库被锁定

    另一件值得注意的事是requests_cache.install_cache() 不是线程安全的,这会导致您之前得到的sqlite3.OperationalError。您可以删除 install_cache() 并改用 requests_cache.CachedSession,这线程安全的:

    def getSess():
        if not hasattr(thread_local, "session"):
            thread_local.session = requests_cache.CachedSession(
                'network_call',
                backend='sqlite',
                expire_after=2592000,
            )
        return thread_local.session
    

    作为参考,differences between sessions and patching 上的 requests-cache 用户指南中有更多信息。

    性能

    关于性能的说明:由于您正在执行大量并发写入,因此 SQLite 并不理想。并发读取速度很快,但对于写入,它在内部排队操作并串行写入数据,而不是并行写入。如果可能,请尝试使用 Redis 或其他 requests-cache backends 之一,它们针对并发写入进行了更好的优化。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多