【问题标题】:How to make 6 calculation as fast as possible based on one datastream?如何基于一个数据流尽可能快地进行 6 次计算?
【发布时间】:2021-06-03 23:37:34
【问题描述】:

我有一个数据流来得非常快,当一个新数据到来时,我想根据它进行 6 种不同的计算。 我想尽可能快地进行这些计算,以便在收到新数据后立即更新。 数据可以以毫秒的速度到达,所以我的计算必须非常快。

所以我想到的最好的事情是同时在 6 个不同的线程上进行这些计算。

我以前从来没有用过线程,所以我不知道该放在哪里。

这是描述我的问题的代码

我可以从这里做什么?

import numpy as np

import time

np.random.seed(0)

def calculation_1(data, multiplicator):
    r = np.log(data * (multiplicator+1))
    return r

start = time.time()
for ii in range(1000000):
    data_stream_main = [np.random.uniform(0, 2.0), np.random.uniform(10, 1000.0), np.random.uniform(0, 0.01)]

    # calculation that has to be done together
    calc_1 = calculation_1(data=data_stream_main[0], multiplicator=2)
    calc_2 = calculation_1(data=data_stream_main[0], multiplicator=3)

    calc_3 = calculation_1(data=data_stream_main[1], multiplicator=2)
    calc_4 = calculation_1(data=data_stream_main[1], multiplicator=3)

    calc_5 = calculation_1(data=data_stream_main[2], multiplicator=2)
    calc_6 = calculation_1(data=data_stream_main[2], multiplicator=3)

print(calc_1)
print(calc_2)
print(calc_3)
print(calc_4)
print(calc_5)
print(calc_6)

print("total time:", time.time() - start)

【问题讨论】:

  • 请注意,Python 中的线程用于并发,而不是并行。线程将有助于加速 I/O 绑定代码,而不是 CPU 绑定代码。即使有适当的并行性,您也必须考虑分支和分支的开销——这对于微秒级的微任务很少有用。如果速度是您的问题,那么通过编译(例如通过 PyPy、Cython、Numba、Nuitka 等)直接提高单线程性能会更合适。
  • 你的意思是计算的顺序是随机的,但不会同时进行两次计算?
  • 简单来说,是的。线程受GIL 限制,因此只有一个线程可以执行/访问 Python 函数/对象。

标签: python multithreading performance multiprocessing


【解决方案1】:

您可以使用 multiprocessing.pool.Poolconcurrent.futures.ProcessPoolExecutor 类创建一个包含 6 个进程的多处理池,您可以将循环中的 6 个任务提交到其中以并行执行并等待结果。以下示例使用multiprocessing.pool.Pool

但是,结果会非常令人失望。

问题在于 (1) 最初创建 6 个进程时存在开销,以及 (2) 将每个任务排队以在子进程所在的不同地址空间中执行的开销。这意味着要使多处理发挥优势,您的工作函数 calculation_1 在这种情况下需要是一个不那么琐碎、运行时间更长、CPU 密集度更高的函数。如果您要向您的工作函数添加以下“无所事事”,CPU 密集型循环......

    cnt = 0
    for i in range(100000):
        cnt += 1

...那么下面的多处理代码快几倍地运行。照原样,坚持你所拥有的。

import numpy as np
import multiprocessing as mp
import time


def calculation_1(data, multiplicator):
    r = np.log(data * (multiplicator+1))
    """
    cnt = 0
    for i in range(100000):
        cnt += 1
    """
    return r

# required for Windows and other platforms that use spawn for creating new processes:
if __name__ == '__main__':
    np.random.seed(0)
    # no point in using more processes than processors:
    n_processors = min(6, mp.cpu_count())
    pool = mp.Pool(n_processors)
    start = time.time()
    for ii in range(1000000):
        data_stream_main = [np.random.uniform(0, 2.0), np.random.uniform(10, 1000.0), np.random.uniform(0, 0.01)]
        # calculation that has to be done together
        # submit tasks:
        result_1 = pool.apply_async(calculation_1, (data_stream_main[0], 2))
        result_2 = pool.apply_async(calculation_1, (data_stream_main[0], 3))
        result_3 = pool.apply_async(calculation_1, (data_stream_main[1], 2))
        result_4 = pool.apply_async(calculation_1, (data_stream_main[1], 3))
        result_5 = pool.apply_async(calculation_1, (data_stream_main[2], 2))
        result_6 = pool.apply_async(calculation_1, (data_stream_main[2], 3))

        # wait for results:
        calc_1 = result_1.get()
        calc_2 = result_2.get()
        calc_3 = result_3.get()
        calc_4 = result_4.get()
        calc_5 = result_5.get()
        calc_6 = result_6.get()

    print(calc_1)
    print(calc_2)
    print(calc_3)
    print(calc_4)
    print(calc_5)
    print(calc_6)

    print("total time:", time.time() - start)

【讨论】:

    【解决方案2】:

    您可以通过将log(data)log(multiplicator) 分开来分解计算。

    鉴于np.log(data * (multiplicator+1))np.log(data) + np.log(multiplicator+1) 相同,您可以计算np.log(multiplicator+1) 的两个可能值并将其存储在全局变量中,然后每个索引只计算一次log(data)(从而节省50%)部分。

    # global variables and calculation function:
    multiplicator2 = np.log(3)
    multiplicator3 = np.log(4)
    def calculation_1(data):
        logData = np.log(data)
        return logData + multiplicator2, logData + multiplicator3 
    
    
    #  in the loop:...
    
        calc_1,calc_2 = calculation_1(data_stream_main[0])
        calc_3,calc_4 = calculation_1(data_stream_main[1])
        calc_5,calc_6 = calculation_1(data_stream_main[2])
    

    如果您有能力在输出结果之前将多行数据缓冲到一个 numpy 矩阵中,则可以通过使用 numpy 的并行性对整个矩阵(或块)执行计算并以块的形式输出结果来获得一些性能改进而不是一次一行。将数据的接收与计算和输出分开是使用线程可能带来好处的地方。

    例如:

    start = time.time()
    chunk = []
    multiplicators = np.array([2,2,2,3,3,3])
    for ii in range(1000000):
        data_stream_main = [np.random.uniform(0, 2.0), np.random.uniform(10, 1000.0), np.random.uniform(0, 0.01)]
        chunk.append(data_stream_main*2)
        if len(chunk)< 1000: continue
        # process 1000 lines at a time and output results
        calcs = np.log(np.array(chunk)*multiplicators)
        calc_1,calc_4,calc_2,calc_5,calc_3,calc6 = calcs[-1,:]
        chunk = [] # reset chunk
        
    print("total time:", time.time() - start) # 2.7 (compared to 6.6)
    

    【讨论】:

      猜你喜欢
      • 2020-02-25
      • 1970-01-01
      • 2021-08-05
      • 2012-03-26
      • 2023-04-02
      • 2021-10-26
      • 2016-01-23
      • 2019-04-30
      • 2016-04-16
      相关资源
      最近更新 更多