【问题标题】:Python Threads are not Improving SpeedPython 线程并没有提高速度
【发布时间】:2014-11-17 16:27:22
【问题描述】:

为了加快某个列表处理逻辑,我写了一个装饰器,它会 1)拦截传入的函数调用 2)获取其输入列表,将其分成多个部分 4)将这些部分传递给单独线程上的原始函数5) 合并输出和返回

我认为这是一个非常巧妙的想法,直到我对其进行编码并发现速度没有变化!尽管我看到 htop 上有多个核心忙,但多线程版本实际上比单线程版本慢。

这和臭名昭著的 cpython GIL 有关系吗?

谢谢!

from threading import Thread 
import numpy as np 
import time

# breaks a list into n list of lists
def split(a, n):
    k, m = len(a) / n, len(a) % n
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))

THREAD_NUM = 8 

def parallel_compute(fn):
    class Worker(Thread):
        def __init__(self, *args):
            Thread.__init__(self)
            self.result = None
            self.args = args
        def run(self):
            self.result = fn(*self.args)
    def new_compute(*args, **kwargs):        
        threads = [Worker(args[0], args[1], args[2], x) for x in split(args[3], THREAD_NUM)]
        for x in threads: x.start()
        for x in threads: x.join()
        final_res = []
        for x in threads: final_res.extend(x.result)
        return final_res        
    return new_compute

# some function that does a lot of computation
def f(x): return np.abs(np.tan(np.cos(np.sqrt(x**2))))

class Foo:
    @parallel_compute
    def compute(self, bla, blah, input_list):
        return map(f, input_list)

inp = [i for i in range(40*1000*100)]
#inp = [1,2,3,4,5,6,7]

if __name__ == "__main__": 

    o = Foo()
    start = time.time()
    res = o.compute(None, None, inp)
    end = time.time()
    print 'parallel', end - start

单线程版本

import time, fast_one, numpy as np

class SlowFoo:
    def compute(self, bla, blah, input_list):
        return map(fast_one.f, input_list)

if __name__ == "__main__": 

    o = SlowFoo()
    start = time.time()
    res = np.array(o.compute(None, None, fast_one.inp))
    end = time.time()
    print 'single', end - start

这是提供"PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed".的多处理版本

import pathos.multiprocessing as mp
import numpy as np, dill
import time

def split(a, n):
    k, m = len(a) / n, len(a) % n
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))

def f(x): return np.abs(np.tan(np.cos(np.sqrt(x**2))))

def compute(input_list):
    return map(f, input_list)

D = 2; pool = mp.Pool(D)
def parallel_compute(fn):
    def new_compute(*args, **kwargs):
        inp = []
        for x in split(args[0], D): inp.append(x)
        outputs_async = pool.map_async(fn, inp)
        outputs = outputs_async.get()
        outputs = [y for x in outputs for y in x]
        return outputs
    return new_compute

compute = parallel_compute(compute)

inp = [i for i in range(40*1000)]

if __name__ == "__main__": 

    start = time.time()
    res = compute(inp)
    end = time.time()
    print 'parallel', end - start
    print len(res)

【问题讨论】:

  • 这有点重复,请尝试使用多处理或搜索 SO 来寻找答案(有很多)。
  • 这样的并行处理通常会产生一些开销。您可能需要有一个相当大规模的进程才能看到单线程版本的实际改进
  • 我一直在单独阅读多处理。这有其自身的问题——我的直接问题是关于 Python 线程。
  • 它在docs for threading 中非常清楚地指出:“CPython 实现细节:在 CPython 中,由于全局解释器锁,只有一个线程可以一次执行 Python 代码(即使某些面向性能的库可能会克服这个限制)。如果您希望您的应用程序更好地利用多核机器的计算资源,建议您使用多处理。但是,如果您想运行多个 I/O,线程仍然是一个合适的模型-同时绑定任务。”
  • 只读共享很简单——你没有 exec 的 fork,你的孩子有你的 state 的副本。如果没有明确的机制,就无法将更改写回父级,但是...

标签: python multithreading cpython gil


【解决方案1】:

是的,当您的线程正在执行用 Python 实现的 CPU 密集型工作(而不是通过可以在从 Python 结构编组/解组数据之前和之后释放 GIL 的 C 扩展)时,GIL 就是一个问题。

我建议使用多处理模型、没有它的 Python 实现(IronPython、Jython 等)或完全不同的语言(如果您正在做对性能敏感的工作,那么语言没有尽头几乎与 Python 一样流畅,但运行时性能要好得多)。

【讨论】:

    【解决方案2】:

    或者,您可以重新设计并启动子流程中的所有并行代码。

    您需要启动子进程进行计算的工作线程。 这些子流程可以真正并行运行。

    【讨论】:

    • 这就是(以前建议的)多处理模块所做的(带有类似线程的接口)。
    猜你喜欢
    • 1970-01-01
    • 2017-06-01
    • 2013-04-24
    • 1970-01-01
    • 2021-09-18
    • 1970-01-01
    • 1970-01-01
    • 2021-11-05
    • 2020-12-29
    相关资源
    最近更新 更多