【问题标题】:Python ThreadPoolExecutor terminate all threadsPython ThreadPoolExecutor 终止所有线程
【发布时间】:2020-10-09 22:17:59
【问题描述】:

我正在运行一段 python 代码,其中多个线程通过线程池执行程序运行。每个线程都应该执行一项任务(例如获取网页)。我想要做的是终止所有线程,即使其中一个线程失败。例如:

with ThreadPoolExecutor(self._num_threads) as executor:
    jobs = []
    for path in paths:
        kw = {"path": path}
        jobs.append(executor.submit(start,**kw))
    for job in futures.as_completed(jobs):
        result = job.result()
        print(result)
def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        #Signal all threads to stop

有可能吗?除非所有线程都成功,否则线程返回的结果对我来说毫无用处,因此即使其中一个失败,我想节省其余线程的一些执行时间并立即终止它们。实际的代码显然是在做相对冗长的任务,有几个失败点。

【问题讨论】:

  • 回答了我的问题,但没有解决我的问题。不过谢谢
  • 不要将“线程”与“任务”混为一谈。线程是操作系统中的代理(即它们运行代码)。任务是需要完成的事情。线程池执行器创建并管理它的自己的线程——你不应该_弄乱它们——执行你submit(...)给它的任务
  • 我对 ThreadPoolExecutor 不太熟悉,但也许我可以给你一个提示,我通常在 python 2.7 中结束线程的方式。
  • 如果可能,使用所有工作线程都可以看到的全局变量或类属性,并在每个工作线程的末尾添加条件,如果作业成功或未设置全局变量。 ej PASS=True/False,并添加检查循环,或者如果在循环内进行工作,则添加检查此变量状态的条件,当任何线程读取 PASS=False 时,应遵循返回语句,这工作线程可以结束的方式..,

标签: python multithreading threadpoolexecutor concurrent.futures


【解决方案1】:

如果您已经完成了线程并想研究进程,那么这里的代码和平看起来非常有前途和简单,几乎与线程相同的语法,但使用多处理模块。

当超时标志到期时进程终止,非常方便。

import multiprocessing

def get_page(*args, **kwargs):
    # your web page downloading code goes here

def start_get_page(timeout, *args, **kwargs):
    p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
    p.start()
    p.join(timeout)
    if p.is_alive():
        # stop the downloading 'thread'
        p.terminate()
        # and then do any post-error processing here

if __name__ == "__main__":
    start_get_page(timeout, *args, **kwargs)

【讨论】:

  • 谢谢你,这可能真的可以完成这项工作。
  • 谢谢,帮了大忙!
【解决方案2】:

您可以尝试使用func-timeout 中的StoppableThread。 但是终止线程是强烈 discouraged。如果你需要杀死一个线程,你可能有一个设计问题。查看替代方案:asyncio 协程和 multiprocessing 具有合法的取消/终止功能。

【讨论】:

  • 你能解释一下为什么杀死一个线程是一个糟糕的设计吗?仅仅是因为线程不是为了被杀死而设计的吗?
  • 不仅如此。解释here
【解决方案3】:

我已经为我遇到的类似问题创建了一个答案,我认为该答案适用于这个问题。

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 100


def long_request(id):
    sleep(1)

    # Simulate bad response
    if id == 10:
        return {"data": {"valid": False}}
    else:
        return {"data": {"valid": True}}


def check_results(results):
    valid = True
    for result in results:
        valid = result["data"]["valid"]

    return valid


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            is_responses_valid = check_results(responses)

            # Cancel all future requests if one invalid
            if not is_responses_valid:
                executor.shutdown(wait=False)
            else:
                # Append valid responses
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)

【讨论】:

    【解决方案4】:

    我会这样做:

    import concurrent.futures
    
    def start(*args,**kwargs):
        #fetch the page
        if(success):
            return True
        else:
            return False
    
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = [executor.submit(start, {"path": path}) for path in paths]
        concurrent.futures.wait(results, timeout=10, return_when=concurrent.futures.FIRST_COMPLETED)
        for f in concurrent.futures.as_completed(results):
            f_success = f.result()
            if not f_success:
                executor.shutdown(wait=False, cancel_futures=True) # shutdown if one fails
            else:
                #do stuff here
    

    如果任何结果不为真,一切都会立即关闭。

    【讨论】:

    • 这是错误的。 shutdown() 不会杀死已经在运行的进程或线程。
    【解决方案5】:

    在我的代码中我使用了多处理

    import multiprocessing as mp
    pool = mp.Pool()
    for i in range(threadNumber):
        pool.apply_async(publishMessage, args=(map_metrics, connection_parameters...,))
    
    pool.close()
    pool.terminate()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-22
      • 2022-01-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多