【问题标题】:python ThreadPoolExecutor close all threads when I get a resultpython ThreadPoolExecutor 在我得到结果时关闭所有线程
【发布时间】:2020-11-22 06:38:20
【问题描述】:

我正在运行一个webscraper class,其方法名称是self.get_with_random_proxy_using_chain

我正在尝试向同一个 url 发送多线程调用,并且希望一旦有来自任何线程的结果,该方法会返回响应并关闭其他仍处于活动状态的线程。

到目前为止,我的代码看起来像这样(可能很天真):

from concurrent.futures import ThreadPoolExecutor, as_completed
# class initiation etc

max_workers = cpu_count() * 5
urls = [url_to_open] * 50

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_url=[]
    for url in urls: # i had to do a loop to include sleep not to overload the proxy server
        future_to_url.append(executor.submit(self.get_with_random_proxy_using_chain,
                                     url,
                                     timeout,
                                     update_proxy_score,
                                     unwanted_keywords,
                                     unwanted_status_codes,
                                     random_universe_size,
                                     file_path_to_save_streamed_content))
        sleep(0.5)

    for future in as_completed(future_to_url):
            if future.result() is not None:
                return future.result()

但它运行所有线程。

有没有办法在第一个未来完成后关闭所有线程。 我正在使用 windows 和 python 3.7x

到目前为止,我找到了这个link,但我无法让它工作(pogram 仍然运行很长时间)。

【问题讨论】:

  • “返回结果”是什么意思?看起来它将返回第一个结果并让所有其他线程完成,而忽略它们的结果。如何从一个 return 语句中获得多个连续结果?
  • 目前它返回一个值(正确),但在遍历所有期货之后。我想在找到第一个结果后停止所有线程/期货。进一步看,我看到了这一点; stackoverflow.com/questions/52631315/…,但找不到 _threads 属性(我编辑了我的问题)

标签: python python-3.x python-multithreading threadpoolexecutor


【解决方案1】:

据我所知,运行期货是无法取消的。 written 有很多关于这个的。甚至还有一些解决方法。

但我建议仔细查看asyncio 模块。它非常适合此类任务。

下面是一个简单的例子,当有几个并发请求时,收到第一个结果后,其余的被取消。

import asyncio
from typing import Set

from aiohttp import ClientSession


async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()


async def wait_for_first_response(tasks):
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for p in pending:
        p.cancel()
    return done.pop().result()


async def request_one_of(*urls):
    tasks = set()
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.create_task(fetch(url, session))
            tasks.add(task)

        return await wait_for_first_response(tasks)


async def main():
    response = await request_one_of("https://wikipedia.org", "https://apple.com")
    print(response)

asyncio.run(main())

【讨论】:

  • tx,我需要使用 aiohttp,还是请求也能正常工作?我不熟悉 asyncio 和 aiohttp,但可能有机会研究一下。
  • 是的,为此你应该使用 aiohttp 或其他异步库,请求不支持异步方法
  • 好像 aiohttp 不支持 IP:PORT 形式的代理,有点卡住了
  • Aiohttp 应该支持带端口的代理地址。你是根据什么得出这个结论的?
  • 为 http 代理添加 http://,即 proxy = 'http://13.75.114.68:25222'
猜你喜欢
  • 2020-10-09
  • 1970-01-01
  • 1970-01-01
  • 2013-12-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多