【问题标题】:Array creation too slow数组创建太慢
【发布时间】:2021-01-26 20:27:03
【问题描述】:

我正在尝试从头开始创建图像数组。 我让代码运行了,但运行它需要大约 30 秒。 我觉得使用 numpy 本机函数可能会更快。 我该怎么做?

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = 256
image_channel = 3

show_img = False


def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

for ii in range(len(volumes)-image_width):
    # ===================== part to optimize start
    final_image = np.zeros((image_heigh, image_width, image_channel))

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    for xxx in range(image_width):
        final_image[:int(vol_norm[xxx]), xxx, :] = 1

    # ===================== part to optimize end

    if show_img:
        image = np.float32(final_image)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        cv2.imshow("ok", image)
        cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

如何才能更快地创建此图像数组? 我需要在每个时间步创建图像,因为我想模拟每个新时间步出现的真实实时数据流。

这就是为什么我只想优化这部分代码:

for xxx in range(image_width):
    final_image[:int(vol_norm[xxx]), xxx, :] = 1

我该怎么做?

【问题讨论】:

  • 我刚刚为您的内部循环实现了简单的优化,使用np.arange(...) 而不是您的内部循环。我的运行时间现在是2.6 sec,而不是之前的27 sec。在my answer开头看到这个优化。
  • 另外一个我没有做的非常有用的优化是,在整个数据的最大/最小值没有改变的情况下,您不需要重新计算以前的图像。只有在 max/min 改变的情况下,您才需要重新计算以前的图像数据。而且我希望您的真实数据会像外汇或比特币价格一样逐渐变化,因此最大/最小变化非常不经常。
  • 我刚刚实现了上面提到的(在之前的评论中)优化(关于如果 min/max 没有改变,则不重新计算图像)以及在最后模拟实时过程中的所有其他优化my answer,还有带有渲染过程的动画PNG,png渲染代码也在实时过程模拟中。我还模拟了像外汇价格一样逐渐改变情节的过程。
  • 刚刚还在我的答案中添加了坐标轴绘图到我最后的实时过程可视化中,使用 matplotlib 库。
  • 我刚刚想出了如何再优化你的内部循环两次。现在运行时间是 1.3 秒,而不是之前的 2.6 秒。 Here is full code。但是我像原来的问题一样放回了3个频道并制作了float32。这两个都将速度降低到 2.8 秒,但由于最后的优化,仍然有可能达到这个好时间。

标签: python arrays image numpy cv2


【解决方案1】:

接下来是最简单的优化:

  1. 使用与np.arange(...) 比较值而不是内部循环。
  2. 使用灰度图像而不是 3 通道 RGB。需要处理的数据减少 3 倍。
  3. 使用 np.uint8 类型而不是 np.float32,这样处理速度更快,并且不需要转换为 float32 即可进行 CV2 可视化。

以上所有这些优化都带来了巨大的加速(10x 次),我的运行时间是2.6 sec 而不是之前的27 sec

另外一个我没有做的非常有用的优化是,在当前窗口中的整个数据的最大/最小值没有改变的情况下,您不需要重新计算以前的图像像素。只有在 max/min 改变的情况下,您才需要重新计算以前的图像数据。而且我预计您的真实数据会像外汇或比特币价格一样逐渐变化,因此窗口内的最大/最小变化非常不常见。

上面提到的优化1)-3)在下一个代码中实现:

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = 256
image_channel = 3

show_img = False

def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

aranges = np.arange(image_heigh, dtype = np.int32)[:, None]

for ii in range(len(volumes)-image_width):
    # ===================== part to optimize start
    #final_image = np.zeros((image_heigh, image_width, image_channel), dtype = np.float32)

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    final_image = (aranges < vol_norm[None, :].astype(np.int32)).astype(np.uint8) * 255

    # ===================== part to optimize end

    if show_img:
        cv2.imshow('ok', final_image)
        cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

对于上面的代码,我只是对内部循环进行了一次优化,它甚至可以将2x 以上的代码加速到1.3 sec 的时间。但我也放回了 3 个通道加上 float32,这降低了速度导致最终的 2.8 sec, here is the code

如果不需要重新计算旧图像数据,则可以进行下一次优化。

要优化的主要内容是,您在每一步都重新计算几乎相同的整个图像,沿宽度移动 1 个像素。而不是这个,您需要计算整个图像一次,然后向右移动不是 1 个像素而是整个图像宽度。

那么经过这个优化运行时间就是0.08 sec

并且只为显示动画进行 1 像素步进,而不是用于计算图像数据,如果您需要速度,应该只计算一次图像数据。

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = volumes.size #256
image_channel = 3
screen_width = 256

show_img = False


def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

for ii in range(0, len(volumes), image_width):
    # ===================== part to optimize start
    final_image = np.zeros((image_heigh, image_width, image_channel))

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    for xxx in range(image_width):
        final_image[:int(vol_norm[xxx]), xxx, :] = 1

    # ===================== part to optimize end

    if show_img:
        for start in range(0, final_image.shape[1] - screen_width):
            image = np.float32(final_image[:, start : start + screen_width])
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            cv2.imshow("ok", image)
            cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

我还根据您的数据创建了动画图像:

如果您想创建相同的动画,只需将下一段代码附加到上述脚本的末尾:

# Needs: python -m pip install pillow
import PIL.Image
imgs = [PIL.Image.fromarray(final_image[:, start : start + screen_width].astype(np.uint8) * 255) for start in range(0, final_image.shape[1] - screen_width, 6)]
imgs[0].save('result.png', append_images = imgs[1:], save_all = True, lossless = True, duration = 100)

我还实现了实时流数据渲染/可视化的模拟。

  1. live_stream() generator 在随机时间点吐出随机数量的数据,这是为了模拟数据的生成过程。
  2. stream_fetcher() 监听实时流并将接收到的所有数据记录到 python 队列 @9​​87654342@,这个 fetcher 在一个线程中运行。
  3. renderer() 获取 fetcher 记录的数据,并通过您的数学公式和规范化过程将其渲染为图像,它渲染尽可能多的数据,导致图像具有不同的宽度,渲染的图像被保存到另一个队列 @9​​87654344@。
  4. visualizer() 通过获取尽可能多的可用渲染图像来可视化渲染数据。

所有函数都在单独的线程中运行,不会阻塞整个进程。此外,如果任何线程工作变慢,那么它会跳过一些数据以赶上当前的实时数据,因此每个队列都不会溢出。

另外你可能会看到可视化过程是跳跃的,这不是因为函数有些慢,而是因为直播在每个时间步中吐出不同数量的数据,这就是实时数据通常的表现。

在接下来的代码中,我还做了前面提到的额外优化,如果 min/max 没有改变,那就是不重新计算图像。

import cv2, numpy as np
import time, random, threading, queue

image_height = 256
image_width = 512

# Make results reproducible and deterministic
np.random.seed(0)
random.seed(0)

def live_stream():
    last = 0.
    while True:
        a = np.random.uniform(low = -1., high = 1., size = random.randint(1, 20)).astype(np.float64).cumsum() + last
        yield a
        last = a[-1]
        time.sleep(random.random() * 0.1)

q0 = queue.Queue()
def stream_fetcher():
    for e in live_stream():
        q0.put(e)

threading.Thread(target = stream_fetcher, daemon = True).start()

aranges = np.arange(image_height, dtype = np.int32)[:, None]

q1 = queue.Queue()
def renderer():
    def normalized(data, data_min, data_max, maximum_value):
        nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))
        return nomamized_data

    prev_image = np.zeros((image_height, 0), dtype = np.uint8)
    prev_vols = np.zeros((0,), dtype = np.float64)
        
    while True:        
        data = []
        data.append(q0.get())
        try:
            while True:
                data.append(q0.get(block = False))
        except queue.Empty:
            pass
                
        vols = np.concatenate(data)[-image_width:]
        prev_vols = prev_vols[-(image_width - vols.size) or prev_vols.size:]
        concat_vols = np.concatenate((prev_vols, vols))[-image_width:]
        vols_min, vols_max = np.amin(concat_vols), np.amax(concat_vols)
        if prev_vols.size > 0 and (vols_min < np.amin(prev_vols) - 10 ** -8 or vols_max > np.amax(prev_vols) + 10 ** -8):
            vols = concat_vols
            prev_image = prev_image[:, :-prev_vols.size]
            prev_vols = prev_vols[:0]

        vols_norm = normalized(
            data = vols, data_min = vols_min,
            data_max = vols_max, maximum_value = image_height,
        )
        
        image = (aranges < vols_norm.astype(np.int32)[None, :]).astype(np.uint8) * 255
        whole_image = np.concatenate((prev_image, image), axis = 1)[:, -image_width:]
        
        q1.put(whole_image)
        
        prev_image = whole_image
        prev_vols = concat_vols

threading.Thread(target = renderer, daemon = True).start()


def visualizer():
    imgs = []
    
    while True:
        data = []
        data.append(q1.get())
        try:
            while True:
                data.append(q1.get(block = False))
        except queue.Empty:
            pass
        image = np.concatenate(data, axis = 1)[:, -image_width:]
        cv2.imshow('ok', image)
        cv2.waitKey(1)

        if imgs is not None:
            try:
                # Needs: python -m pip install pillow
                import PIL.Image
                has_pil = True
            except:
                has_pil = False
                imgs = None
            if has_pil:
                imgs.append(PIL.Image.fromarray(np.pad(image, ((0, 0), (image_width - image.shape[1], 0)), constant_values = 0)))

                if len(imgs) >= 1000:
                    print('saving...', flush = True)
                    imgs[0].save('result.png', append_images = imgs[1:], save_all = True, lossless = True, duration = 100)
                    imgs = None
                    print('saved!', flush = True)

threading.Thread(target = visualizer, daemon = True).start()

while True:
    time.sleep(0.1)

上面的实时过程模拟被渲染成result.png,我在下面显示:

我还决定通过使用更高级的matplotlib 而不是cv2 来改进可视化,以便能够显示轴并进行实时绘图。可视化图片如下:

接下来是一个基于matplotlib的代码,对应上面最后一张图片:

import cv2, numpy as np
import time, random, threading, queue

image_height = 256
image_width = 512
save_nsec = 20
dpi, fps = 100, 15

# Make results reproducible and deterministic
np.random.seed(0)
random.seed(0)

def live_stream():
    last = 0.
    pos = 0
    while True:
        a = np.random.uniform(low = -1., high = 1., size = random.randint(1, 30)).astype(np.float64).cumsum() + last
        yield a, pos, pos + a.size - 1
        pos += a.size
        last = a[-1]
        time.sleep(random.random() * 2.2 / fps)

q0 = queue.Queue()
def stream_fetcher():
    for e in live_stream():
        q0.put(e)

threading.Thread(target = stream_fetcher, daemon = True).start()

aranges = np.arange(image_height, dtype = np.int32)[:, None]

q1 = queue.Queue()
def renderer():
    def normalized(data, data_min, data_max, maximum_value):
        nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))
        return nomamized_data

    prev_image = np.zeros((image_height, 0), dtype = np.uint8)
    prev_vols = np.zeros((0,), dtype = np.float64)
        
    while True:        
        data = []
        data.append(q0.get())
        try:
            while True:
                data.append(q0.get(block = False))
        except queue.Empty:
            pass
            
        data_vols = [e[0] for e in data]
        data_minx, data_maxx = data[0][1], data[-1][2]

        vols = np.concatenate(data_vols)[-image_width:]
        prev_vols = prev_vols[-(image_width - vols.size) or prev_vols.size:]
        concat_vols = np.concatenate((prev_vols, vols))[-image_width:]
        vols_min, vols_max = np.amin(concat_vols), np.amax(concat_vols)
        if prev_vols.size > 0 and (vols_min < np.amin(prev_vols) - 10 ** -8 or vols_max > np.amax(prev_vols) + 10 ** -8):
            vols = concat_vols
            prev_image = prev_image[:, :-prev_vols.size]
            prev_vols = prev_vols[:0]

        vols_norm = normalized(
            data = vols, data_min = vols_min,
            data_max = vols_max, maximum_value = image_height,
        )
        
        image = (aranges < vols_norm.astype(np.int32)[None, :]).astype(np.uint8) * 255
        whole_image = np.concatenate((prev_image, image), axis = 1)[:, -image_width:]
        
        q1.put((whole_image, data_maxx - whole_image.shape[1] + 1, data_maxx, vols_min, vols_max))
        
        prev_image = whole_image
        prev_vols = concat_vols

threading.Thread(target = renderer, daemon = True).start()


def visualizer():
    import matplotlib.pyplot as plt, matplotlib.animation
    
    def images():
        while True:
            data = []
            data.append(q1.get())
            try:
                while True:
                    data.append(q1.get(block = False))
            except queue.Empty:
                pass
            minx = min([e[1] for e in data])
            maxx = min([e[2] for e in data])
            miny = min([e[3] for e in data])
            maxy = min([e[4] for e in data])
            image = np.concatenate([e[0] for e in data], axis = 1)[:, -image_width:]
            image = np.pad(image, ((0, 0), (image_width - image.shape[1], 0)), constant_values = 0)
            image = np.repeat(image[:, :, None], 3, axis = -1)
            yield image, minx, maxx, miny, maxy
            
    it = images()
    im = None
    fig = plt.figure(figsize = (image_width / dpi, image_height / dpi), dpi = dpi)
            
    def animate_func(i):
        nonlocal it, im, fig
        image, minx, maxx, miny, maxy = next(it)
        print(f'.', end = '', flush = True)
        if im is None:
            im = plt.imshow(image, interpolation = 'none', aspect = 'auto')
        else:
            im.set_array(image)
        im.set_extent((minx, maxx, miny, maxy))
        return [im]
            
    anim = matplotlib.animation.FuncAnimation(fig, animate_func, frames = round(save_nsec * fps), interval = 1000 / fps)
    
    print('saving...', end = '', flush = True)
    #anim.save('result.mp4', fps = fps, dpi = dpi, extra_args = ['-vcodec', 'libx264'])
    anim.save('result.gif', fps = fps, dpi = dpi, writer = 'imagemagick')
    print('saved!', end = '', flush = True)
    
    plt.show()

threading.Thread(target = visualizer, daemon = True).start()

while True:
    time.sleep(0.1)

然后我决定用 RGB 调色板为最后一张图像上色,峰值越高越偏红,如果它在中间越偏绿,如果是足够低,那么它就更蓝了。下面的结果图片是由this coloring code实现的:

下面还有一个彩色动画,线条样式而不是条形样式,在this code的帮助下:

【讨论】:

  • 非常感谢您的回答。我看到你一起计算了整个数据集,这是一个绝妙的主意。但事实是我需要在每个时间步创建图像,我想模拟真实的实时数据流。这就是为什么我只想优化这部分代码:for xxx in range(image_width): final_image[:int(vol_norm[xxx]), xxx, :] = 1
  • @user3639702 刚刚用动画图像更新了我的答案,这是使用 CV2 可视化数据的替代方法。创建此类动画图像的代码附在答案末尾。
  • @user3639702 您可以像我一样使用相同的优化技术(计算整个图像),但对于您的直播。您只需执行下一件事 - 您获取尽可能多的实时流列。渲染这么多列的图像。在另一个线程中显示实时动画。然后这个渲染需要一些时间,可能有 20 个新列到达,所以你再次计算 20 像素宽度的图像并对其进行动画处理,等等。我的意思是你需要分批计算,一次计算几列。然后你就可以足够快地渲染实时数据了。
  • @user3639702 如果需要,我可以在我的代码中实现这个实时获取的模拟过程。我确信您的代码足够快,可以在几分之一秒内计算数千列。唯一的事情是您不需要一次又一次地重新计算整个图像,这是唯一缓慢的操作。您只需重新计算刚刚进入直播的新列。
  • 我每次重新制作整个图像的主要想法是因为我想确保所有数据都被归一化。例如,如果新列的最大值大于我已经在图像中计算的值,那么我需要根据新的最大值重新计算整个图像以获得标准化图像。
猜你喜欢
  • 2020-12-13
  • 2017-05-20
  • 1970-01-01
  • 1970-01-01
  • 2014-08-03
  • 2019-11-12
  • 1970-01-01
  • 2015-06-06
  • 2018-12-06
相关资源
最近更新 更多