【问题标题】:Thread Getting Stuck At Join线程卡在加入
【发布时间】:2017-01-05 08:03:28
【问题描述】:

我正在运行一个线程池,它给出了一个随机错误。有时它可以工作,有时它会卡在这个代码的 pool.join 部分。我已经在这几天了,但在它何时工作或何时卡住之间找不到任何区别。请帮忙...

这是代码...

def run_thread_pool(functions_list):

    # Make the Pool of workers
    pool = ThreadPool()  # left blank to default to machine number of cores

    pool.map(run_function, functions_list)

    # close the pool and wait for the work to finish
    pool.close()
    pool.join()
    return

同样,这段代码也随机卡在 q.join(:

def run_queue_block(methods_list, max_num_of_workers=20):
    from views.console_output_handler import add_to_console_queue

    '''
    Runs methods on threads.  Stores method returns in a list.  Then outputs that list
    after all methods in the list have been completed.

    :param methods_list: example ((method name, args), (method_2, args), (method_3, args)
    :param max_num_of_workers: The number of threads to use in the block.
    :return: The full list of returns from each method.
    '''

    method_returns = []

    log = StandardLogger(logger_name='run_queue_block')

    # lock to serialize console output
    lock = threading.Lock()

    def _output(item):
        # Make sure the whole print completes or threads can mix up output in one line.
        with lock:
            if item:
                add_to_console_queue(item)
            msg = threading.current_thread().name, item
            log.log_debug(msg)

        return

    # The worker thread pulls an item from the queue and processes it
    def _worker():
        log = StandardLogger(logger_name='_worker')

        while True:
            try:
                method, args = q.get()  # Extract and unpack callable and arguments

            except:
                # we've hit a nonetype object.
                break

            if method is None:
                break

            item = method(*args)  # Call callable with provided args and store result
            method_returns.append(item)
            _output(item)

            q.task_done()

    num_of_jobs = len(methods_list)

    if num_of_jobs < max_num_of_workers:
        max_num_of_workers = num_of_jobs

    # Create the queue and thread pool.
    q = Queue()

    threads = []
    # starts worker threads.
    for i in range(max_num_of_workers):
        t = threading.Thread(target=_worker)
        t.daemon = True  # thread dies when main thread (only non-daemon thread) exits.
        t.start()
        threads.append(t)

    for method in methods_list:
        q.put(method)

    # block until all tasks are done
    q.join()

    # stop workers
    for i in range(max_num_of_workers):
        q.put(None)
    for t in threads:
        t.join()

    return method_returns

我永远不知道它什么时候会起作用。它大部分时间都有效,但大多数时候还不够好。什么可能导致这样的错误?

【问题讨论】:

  • 当你在method is Nonebreak 时,任务是否留在队列中 => 队列不会加入,因为它永远不会为空?
  • concurrent.futures.ThreadPoolExecutor 没有 closejoin 方法。
  • 我将如何关闭线程并等待所有线程完成?
  • 为什么_worker中有while循环?

标签: python multithreading python-3.x concurrency


【解决方案1】:

您必须在 concurrent.futures.ThreadPoolExecutor 对象上调用 shutdown。然后returnpool.map 的结果。

def run_thread_pool(functions_list):

    # Make the Pool of workers
    pool = ThreadPool()  # left blank to default to machine number of cores

    result = pool.map(run_function, functions_list)

    # close the pool and wait for the work to finish
    pool.shutdown()
    return result

我已经简化了您的代码,没有 Queue 对象和守护进程 Thread。检查它是否符合您的要求。

def run_queue_block(methods_list):
    from views.console_output_handler import add_to_console_queue

    '''
    Runs methods on threads.  Stores method returns in a list.  Then outputs that list
    after all methods in the list have been completed.

    :param methods_list: example ((method name, args), (method_2, args), (method_3, args)
    :param max_num_of_workers: The number of threads to use in the block.
    :return: The full list of returns from each method.
    '''

    method_returns = []

    log = StandardLogger(logger_name='run_queue_block')

    # lock to serialize console output
    lock = threading.Lock()

    def _output(item):
        # Make sure the whole print completes or threads can mix up output in one line.
        with lock:
            if item:
                add_to_console_queue(item)
            msg = threading.current_thread().name, item
            log.log_debug(msg)

        return

    # The worker thread pulls an item from the queue and processes it
    def _worker(method, *args, **kwargs):
        log = StandardLogger(logger_name='_worker')


        item = method(*args, **kwargs)  # Call callable with provided args and store result
        with lock:
            method_returns.append(item)
        _output(item)

    threads = []
    # starts worker threads.
    for method, args in methods_list:
        t = threading.Thread(target=_worker, args=(method, args))
        t.start()
        threads.append(t)

    # stop workers
    for t in threads:
        t.join()

    return method_returns

【讨论】:

    【解决方案2】:

    要允许您的队列加入第二个示例,您需要确保从队列中删除所有任务。

    所以在你的 _worker 函数中,即使任务无法处理,也要将它们标记为已完成,否则队列永远不会被清空,你的程序将挂起。

    def _worker():
        log = StandardLogger(logger_name='_worker')
    
        while True:
            try:
                method, args = q.get()  # Extract and unpack callable and arguments
    
            except:
                # we've hit a nonetype object.
                q.task_done()
                break
    
            if method is None:
                q.task_done()
                break
    
            item = method(*args)  # Call callable with provided args and store result
            method_returns.append(item)
            _output(item)
    
            q.task_done()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-02-26
      • 2015-07-17
      相关资源
      最近更新 更多