【问题标题】:How can I scrape faster我怎样才能更快地刮
【发布时间】:2020-04-06 13:54:00
【问题描述】:

这里的工作是抓取一个从https://xxx.xxx.xxx/xxx/1.jsonhttps://xxx.xxx.xxx/xxx/1417749.json 的站点的API,并将其准确地写入mongodb。为此,我有以下代码:

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min = 1
max = 1417749
for n in range(min, max):
    response = requests.get("https:/xx.xxx.xxx/{}.json".format(str(n)))
    if response.status_code == 200:
        parsed = json.loads(response.text)
        inserted = com.insert_one(parsed)
        write_log.write(str(n) + "\t" + str(inserted) + "\n")
        print(str(n) + "\t" + str(inserted) + "\n")
write_log.close()

但是完成这项任务需要很多时间。这里的问题是如何加快这个过程。

【问题讨论】:

  • 您是否首先尝试对处理单个 json 需要多长时间进行基准测试?假设每条记录需要 300 毫秒,您可以在大约 5 天内依次处理所有这些记录。

标签: python mongodb web-scraping pymongo


【解决方案1】:

您可以从两个方面改进您的代码:

  • 使用Session,这样就不会在每次请求时重新安排连接并保持打开状态;

  • 在您的代码中使用asyncio 的并行性;

看这里https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html

【讨论】:

  • 你能补充一些细节吗?
【解决方案2】:

首先创建所有链接的列表,因为所有链接都相同,只需更改迭代即可。

list_of_links=[]
for i in range(1,1417749):
    list_of_links.append("https:/xx.xxx.xxx/{}.json".format(str(i)))

t_no=2
for i in range(0, len(list_of_links), t_no):
    all_t = []
    twenty_links = list_of_links[i:i + t_no]
    for link in twenty_links:
        obj_new = Demo(link,)
        t = threading.Thread(target=obj_new.get_json)
        t.start()
        all_t.append(t)
    for t in all_t:
        t.join()

class Demo:
    def __init__(self, url):
        self.json_url = url

def get_json(self):
    try:
       your logic
    except Exception as e:
       print(e)

通过简单地增加或减少 t_no 你可以改变线程数..

【讨论】:

    【解决方案3】:

    假设您不会被 API 阻止并且没有速率限制,则此代码应该使该过程快 50 倍(也许更多,因为现在所有请求都使用同一个会话发送)。

    import pymongo
    import threading
    
    client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
    db = client["thread1"]
    com = db["threadcol"]
    start_time = time.time()
    logs=[]
    
    number_of_json_objects=1417750
    number_of_threads=50
    
    session=requests.session()
    
    def scrap_write_log(session,start,end):
        for n in range(start, end):
            response = session.get("https:/xx.xxx.xxx/{}.json".format(n))
            if response.status_code == 200:
                try:
                    logs.append(str(n) + "\t" + str(com.insert_one(json.loads(response.text))) + "\n")
                    print(str(n) + "\t" + str(inserted) + "\n")
                except:
                    logs.append(str(n) + "\t" + "Failed to insert" + "\n")
                    print(str(n) + "\t" + "Failed to insert" + "\n")
    
    thread_ranges=[[x,x+number_of_json_objects//number_of_threads] for x in range(0,number_of_json_objects,number_of_json_objects//number_of_threads)]
    
    threads=[threading.Thread(target=scrap_write_log, args=(session,start_and_end[0],start_and_end[1])) for start_and_end in thread_ranges]
    
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    
    with open("logging.log", "a") as f:
        for line in logs:
            f.write(line)
    

    【讨论】:

      【解决方案4】:

      很多年前我碰巧有同样的问题。我从不满意基于 python 的答案,它们非常慢或太复杂。改用其他成熟工具后,速度很快,再也回不来了。

      最近我使用这样的步骤来加快进程如下。

      1. 在txt中生成一堆url
      2. 使用aria2c -x16 -d ~/Downloads -i /path/to/urls.txt 下载这些文件
      3. 本地解析

      这是迄今为止我想到的最快的过程。

      在抓取网页方面,我什至下载了必要的*.html,而不是一次访问该页面,这实际上没有任何区别。当您点击访问页面时,使用requestsscrapyurllib 等python 工具,它仍会为您缓存和下载整个网页内容。

      【讨论】:

        【解决方案5】:

        如果你不想使用多线程,asyncio 也是一个解决方案

        import time
        import pymongo
        import json
        import asyncio
        from aiohttp import ClientSession
        
        
        async def get_url(url, session):
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.text()
        
        
        async def create_task(sem, url, session):
            async with sem:
                response = await get_url(url, session)
                if response:
                    parsed = json.loads(response)
                    n = url.rsplit('/', 1)[1]
                    inserted = com.insert_one(parsed)
                    write_log.write(str(n) + "\t" + str(inserted) + "\n")
                    print(str(n) + "\t" + str(inserted) + "\n")
        
        
        async def run(minimum, maximum):
            url = 'https:/xx.xxx.xxx/{}.json'
            tasks = []
            sem = asyncio.Semaphore(1000)   # Maximize the concurrent sessions to 1000, stay below the max open sockets allowed
            async with ClientSession() as session:
                for n in range(minimum, maximum):
                    task = asyncio.ensure_future(create_task(sem, url.format(n), session))
                    tasks.append(task)
                responses = asyncio.gather(*tasks)
                await responses
        
        
        client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
        db = client["thread1"]
        com = db["threadcol"]
        start_time = time.time()
        write_log = open("logging.log", "a")
        min_item = 1
        max_item = 100
        
        loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(run(min_item, max_item))
        loop.run_until_complete(future)
        write_log.close()
        

        【讨论】:

          【解决方案6】:

          尝试分块请求并使用 MongoDB 批量写入操作。

          • 对请求进行分组(每组 100 个请求)
          • 遍历组
          • 使用异步请求模型获取数据(一组URL)
          • 完成组后更新数据库(批量写入操作)

          这可能会通过以下方式节省大量时间 * MongoDB 写入延迟 * 同步网络调用延迟

          但不要增加并行请求数(Chunk size),会增加服务器的网络负载,服务器可能会认为这是DDoS攻击。

          1. https://api.mongodb.com/python/current/examples/bulk.html

          【讨论】:

            【解决方案7】:

            您可以做几件事:

            1. 重用连接。根据下面的基准,它快了大约 3 倍
            2. 您可以在多个进程中并行抓取

            来自here的并行代码

            from threading import Thread
            from Queue import Queue
            q = Queue(concurrent * 2)
            for i in range(concurrent):
                t = Thread(target=doWork)
                t.daemon = True
                t.start()
            try:
                for url in open('urllist.txt'):
                    q.put(url.strip())
                q.join()
            except KeyboardInterrupt:
                sys.exit(1)
            

            this question 的可重用连接时间

            >>> timeit.timeit('_ = requests.get("https://www.wikipedia.org")', 'import requests', number=100)
            Starting new HTTPS connection (1): www.wikipedia.org
            Starting new HTTPS connection (1): www.wikipedia.org
            Starting new HTTPS connection (1): www.wikipedia.org
            ...
            Starting new HTTPS connection (1): www.wikipedia.org
            Starting new HTTPS connection (1): www.wikipedia.org
            Starting new HTTPS connection (1): www.wikipedia.org
            52.74904417991638
            >>> timeit.timeit('_ = session.get("https://www.wikipedia.org")', 'import requests; session = requests.Session()', number=100)
            Starting new HTTPS connection (1): www.wikipedia.org
            15.770191192626953
            

            【讨论】:

              【解决方案8】:

              您可能正在寻找的是异步抓取。我建议你创建一些批次的 url,即 5 个 url(尽量不要破坏网站),然后异步抓取它们。如果您对异步不太了解,请在 google 上搜索 libary asyncio。希望能帮到你:)

              【讨论】:

              • 你能补充一些细节吗?
              猜你喜欢
              • 1970-01-01
              • 2019-12-16
              • 1970-01-01
              • 1970-01-01
              • 2011-06-14
              • 2015-11-22
              • 1970-01-01
              • 1970-01-01
              • 2014-07-19
              相关资源
              最近更新 更多