【问题标题】:Python - Shrinking pool of threads dynamically / Stop a threadPython - 动态收缩线程池/停止线程
【发布时间】:2013-04-29 10:56:38
【问题描述】:

我正在编写一个小型多线程 http 文件下载器,并希望能够在代码遇到错误时缩小可用线程

这些错误将特定于在 Web 服务器不允许更多连接的情况下返回的 http 错误

例如。如果我设置了一个包含 5 个线程的池,则每个线程都试图打开它自己的连接并下载文件的一部分。服务器可能只允许 2 个连接,我相信会返回 503 错误,我想检测到这一点并关闭一个线程,最终将池的大小限制为可能只有服务器允许的 2 个

我可以让线程自行停止吗?

self.Thread_stop() 足够了吗?

我还需要 join() 吗?

这是我的工作类,负责下载,从队列中获取处理,下载后将结果转储到 resultQ 中,由主线程保存到文件中

在这里,我想检测 http 503 并从可用池中停止/杀死/删除一个线程 - 当然,将失败的块重新添加回队列,以便其余线程将处理它

class Downloader(threading.Thread):
    def __init__(self, queue, resultQ, file_name):
        threading.Thread.__init__(self)
        self.workQ = queue
        self.resultQ = resultQ
        self.file_name = file_name

    def run(self):
        while True:
            block_num, url, start, length = self.workQ.get()
            print 'Starting Queue #: %s' % block_num
            print start
            print length

            #Download the file
            self.download_file(url, start, length)

            #Tell queue that this task is done
            print 'Queue #: %s finished' % block_num
            self.workQ.task_done()


    def download_file(self, url, start, length):        

        request = urllib2.Request(url, None, headers)
        if length == 0:
            return None
        request.add_header('Range', 'bytes=%d-%d' % (start, start + length))

        while 1:
            try:
                data = urllib2.urlopen(request)
            except urllib2.URLError, u:
                print "Connection did not start with", u
            else:
                break

        chunk = ''
        block_size = 1024
        remaining_blocks = length

        while remaining_blocks > 0:

            if remaining_blocks >= block_size:
                fetch_size = block_size
            else:
                fetch_size = int(remaining_blocks)

            try:
                data_block = data.read(fetch_size)
                if len(data_block) == 0:
                    print "Connection: [TESTING]: 0 sized block" + \
                        " fetched."
                if len(data_block) != fetch_size:
                    print "Connection: len(data_block) != length" + \
                        ", but continuing anyway."
                    self.run()
                    return

            except socket.timeout, s:
                print "Connection timed out with", s
                self.run()
                return

            remaining_blocks -= fetch_size
            chunk += data_block

        resultQ.put([start, chunk])

下面是我初始化线程池的地方,再往下我将项目放入队列

# create a thread pool and give them a queue
for i in range(num_threads):
    t = Downloader(workQ, resultQ, file_name)
    t.setDaemon(True)
    t.start()

【问题讨论】:

    标签: python threadpool


    【解决方案1】:

    我可以让线程自行停止吗?

    不要使用self._Thread__stop()。退出线程的run() 方法就足够了(您可以检查一个标志或从队列中读取一个标记值以了解何时退出)。

    在这里,我想检测 http 503 并从可用池中停止/杀死/删除一个线程 - 当然,将失败的块重新添加回队列,以便其余线程将处理它

    您可以通过分离职责来简化代码:

    • download_file() 不应尝试在无限循环中重新连接。如果有错误;让我们调用download_file()的代码在必要时重新提交
    • 关于并发连接数的控制可以封装在Semaphore对象中。在这种情况下,线程数可能与并发连接数不同
    import concurrent.futures # on Python 2.x: pip install futures 
    from threading import BoundedSemaphore
    
    def download_file(args):
        nconcurrent.acquire(timeout=args['timeout']) # block if too many connections
        # ...
        nconcurrent.release() #NOTE: don't release it on exception,
                              #      allow the caller to handle it
    
    # you can put it into a dictionary: server -> semaphore instead of the global
    nconcurrent = BoundedSemaphore(5) # start with at most 5 concurrent connections
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        future_to_args = dict((executor.submit(download_file, args), args)
                               for args in generate_initial_download_tasks())
    
        while future_to_args:
            for future in concurrent.futures.as_completed(dict(**future_to_args)):
                args = future_to_args.pop(future)
                try: 
                    result = future.result()
                except Exception as e:
                    print('%r generated an exception: %s' % (args, e))
                    if getattr(e, 'code') != 503:
                       # don't decrease number of concurrent connections
                       nconcurrent.release() 
                    # resubmit
                    args['timeout'] *= 2                    
                    future_to_args[executor.submit(download_file, args)] = args
                else: # successfully downloaded `args`
                    print('f%r returned %r' % (args, result))
    

    ThreadPoolExecutor() example

    【讨论】:

    • 谢谢,我需要仔细阅读。我终于得出了与您说的相同的结论,只需退出线程 run() 它将停止尝试从队列中提取。我喜欢您提出的建议,谢谢!
    【解决方案2】:

    您应该使用线程池来控制线程的生命周期:

    然后当线程存在时,您可以向主线程(正在处理线程池)​​发送消息,然后更改线程池的大小,并将新请求或失败的请求推迟到您将清空的堆栈中。

    tedelanay 关于您为线程提供的守护程序状态是绝对正确的。无需将它们设置为守护进程。

    基本上,您可以简化代码,您可以执行以下操作:

    import threadpool
    
    def process_tasks():
        pool = threadpool.ThreadPool(4)
    
        requests = threadpool.makeRequests(download_file, arguments)
    
        for req in requests:
            pool.putRequest(req) 
    
        #wait for them to finish (or you could go and do something else)
        pool.wait()
    
    if __name__ == '__main__': 
        process_tasks()
    

    arguments 取决于您的策略。要么给你的线程一个队列作为参数,然后清空队列。或者您可以在 process_tasks 中处理队列,在池已满时阻塞,并在线程完成时打开一个新线程,但队列不为空。这完全取决于您的需求和下载器的上下文。

    资源:

    【讨论】:

    • 非常好的信息,谢谢!我没有看到您如何使用线程池重新调整池的大小。我一定忽略了一些明显的东西?
    【解决方案3】:

    一个 Thread 对象简单地通过从 run 方法返回来终止线程 - 它不调用 stop。如果您将线程设置为守护程序模式,则无需加入,否则主线程需要执行此操作。线程通常使用 resultq 报告它正在退出,而主线程使用该信息进行连接是很常见的。这有助于有序终止您的流程。如果 python 仍在处理多个线程并且最好避开它,那么在系统退出期间你可能会遇到奇怪的错误。

    【讨论】:

    • 但是正如你所看到的,只要有从 workQ 中抓取的项目,线程就会继续运行,如果一个线程遇到 503,我想将可用线程的数量减少 1.. 离开剩余线程来处理 workQ 中剩余的内容
    猜你喜欢
    • 2015-12-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-18
    • 2022-01-19
    • 1970-01-01
    • 2014-05-29
    • 2012-02-22
    相关资源
    最近更新 更多