【问题标题】:How to obtain the results from a pool of threads in python?如何从python中的线程池中获取结果?
【发布时间】:2014-11-24 02:50:19
【问题描述】:

我在这里搜索了如何在 python 中进行线程化,但到目前为止我还没有得到我需要的答案。 我对 Queue 和 Threading python 类不是很熟悉,因此这里提供的一些答案对我来说毫无意义。

我想创建一个线程池,我可以提供不同的任务,当所有线程都结束时获取结果值并处理它们。 到目前为止,我已经尝试过这样做,但我无法得到结果。我写的代码是:

from threading import Thread
from Queue import Queue

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.result = None
        self.start()
    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                self.result = func(*args, **kargs)
            except Exception, e:
                print e
            self.tasks.task_done()
    def get_result(self):
        return self.result

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        self.results = []
        for _ in range(num_threads):
            w = Worker(self.tasks)
            self.results.append(w.get_result())
    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))
    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()
    def get_results(self):
        return self.results

def foo(word, number):
    print word*number
    return number

words = ['hello', 'world', 'test', 'word', 'another test']
numbers = [1,2,3,4,5]
pool = ThreadPool(5)
for i in range(0, len(words)):
    pool.add_task(foo, words[i], numbers[i])

pool.wait_completion()
results = pool.get_results()
print results

输出打印带有单词给定乘以给定数字的字符串,但结果列表充满了 None 值,所以我应该把 func 的返回值放在哪里。

或者简单的方法是创建一个列表,在其中填充队列并添加字典或一些变量以将结果作为参数存储到我的函数中,然后在将任务添加到队列后将此结果参数添加到结果列表:

def foo(word, number, r):
    print word*number
    r[(word,number)] = number
    return number

words = ['hello', 'world', 'test', 'word', 'another test']
numbers = [1,2,3,4,5]
pool = ThreadPool(5)
results = []
for i in range(0, len(words)):
    r = {}
    pool.add_task(foo, words[i], numbers[i], r)
    results.append(r)
print results

【问题讨论】:

    标签: python multithreading queue return-value


    【解决方案1】:

    Python其实有一个内置的线程池可以使用,its just not well documented

    from multiprocessing.pool import ThreadPool
    
    def foo(word, number):
        print (word * number)
        r[(word,number)] = number
        return number
    
    words = ['hello', 'world', 'test', 'word', 'another test']
    numbers = [1,2,3,4,5]
    pool = ThreadPool(5)
    results = []
    for i in range(0, len(words)):
        results.append(pool.apply_async(foo, args=(words[i], numbers[i])))
    
    pool.close()
    pool.join()
    results = [r.get() for r in results]
    print results
    

    或者(使用map 而不是apply_async):

    from multiprocessing.pool import ThreadPool
    
    def foo(word, number):
        print word*number
        return number
    
    def starfoo(args):
        """ 
    
        We need this because map only supports calling functions with one arg. 
        We need to pass two args, so we use this little wrapper function to
        expand a zipped list of all our arguments.
    
        """    
        return foo(*args)
    
    words = ['hello', 'world', 'test', 'word', 'another test']
    numbers = [1,2,3,4,5]
    pool = ThreadPool(5)
    # We need to zip together the two lists because map only supports calling functions
    # with one argument. In Python 3.3+, you can use starmap instead.
    results = pool.map(starfoo, zip(words, numbers))
    print results
    
    pool.close()
    pool.join()
    

    【讨论】:

    • 第二种情况,当task的数量和pool的大小一样时,是不是很有用?
    • 它适用于任意数量的任务,Pool 适用于任意数量的工人。 map 如果你想对一个迭代的所有项目运行一个函数并返回每个调用的结果,这很有用。如果您有 5 个工作人员来处理长度为 100 的迭代,Pool 将针对所有 100 个项目调用该函数,但绝不会同时运行超过 5 个线程。输出将是一个长度为 100 的可迭代对象,其中包含所有函数调用的结果值。
    • @RafaelRios 另一个注意事项,由于GIL,在 Python 中使用线程执行 CPU 密集型工作没有性能优势。要绕过这个限制,您需要通过multiprocessing 模块来使用多个进程。对于上面的示例,您可以使用from multiprocessing import Pool 而不是from multiprocessing.pool import ThreadPool 进行切换。其他一切都保持不变。
    猜你喜欢
    • 1970-01-01
    • 2011-03-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多