【问题标题】:GIL for IO bounded thread in C extension (HDF5)C 扩展中 IO 有界线程的 GIL (HDF5)
【发布时间】:2016-07-29 01:04:59
【问题描述】:

我有一个采样应用程序,它每秒获取 250,000 个样本,将它们缓存在内存中,并最终附加到 pandas 提供的 HDFStore。总的来说,这很棒。但是,我有一个线程在运行并不断清空数据采集设备 (DAQ),它需要定期运行。大约一秒钟的偏差往往会破坏事物。下面是观察到的时间的极端情况。 Start 表示 DAQ 读取开始,Finish 表示完成,IO 表示 HDF 写入(DAQIO 都发生在不同的线程中)。

Start        : 2016-04-07 12:28:22.241303
IO (1)       : 2016-04-07 12:28:22.241303
Finish       : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms)
IO Done (1)  : 2016-04-07 12:28:46.573440 (24332.39 ms)

如您所见,执行此写入需要 24 秒(典型的写入时间约为 40 毫秒)。我正在写入的硬盘没有负载,所以这种延迟不应该是由争用引起的(运行时它的利用率约为 7%)。我已禁用我的 HDFStore 写入的索引。我的应用程序运行许多其他线程,所有这些线程都打印状态字符串,因此看起来 IO 任务正在阻塞所有其他线程。我花了相当多的时间单步执行代码来找出速度变慢的地方,而且它总是在 C 扩展提供的方法中,这引出了我的问题..

  1. Python(我使用的是 3.5)能否抢占 C 扩展中的执行? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock? 似乎表明它不会,除非扩展特别产生。
  2. Pandas 的 HDF5 C 代码是否为 I/O 实现了任何让步?如果是这样,这是否意味着延迟是由于 CPU 受限任务造成的?我已禁用索引。
  3. 关于如何获得一致的计时有什么建议吗?我正在考虑将 HDF5 代码移动到另一个进程中。不过,这只会在一定程度上有所帮助,因为无论如何我都不能容忍大约 20 秒的写入,尤其是当它们不可预测时。

这是一个您可以运行以查看问题的示例:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import time

def write_samples(store, samples, overwrite):
    frame = pd.DataFrame(samples, dtype='float64')

    if not overwrite:
        store.append("df", frame, format='table', index=False)
    else:
        store.put("df", frame, format='table', index=False)

def begin_io():
    store = pd.HDFStore("D:\\slow\\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0)

    counter = 0
    while True:
        data = np.random.rand(50000, 1)
        start_time = timer()
        write_samples(store, data, counter == 0)
        end_time = timer()

        print("IO Done      : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter))

        counter += 1

    store.close()

def dummy_thread():
    previous = timer()
    while True:
        now = timer()
        print("Dummy Thread  : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000))
        previous = now
        time.sleep(0.01)


if __name__ == '__main__':
    threading.Thread(target=dummy_thread).start()
    begin_io()

你会得到类似的输出:

IO Done      : 2016-04-08 10:51:14.100479 (3.63 ms, 470)
Dummy Thread  : 2016-04-08 10:51:14.101484 (12 ms)
IO Done      : 2016-04-08 10:51:14.104475 (3.01 ms, 471)
Dummy Thread  : 2016-04-08 10:51:14.576640 (475 ms)
IO Done      : 2016-04-08 10:51:14.576640 (472.00 ms, 472)
Dummy Thread  : 2016-04-08 10:51:14.897756 (321 ms)
IO Done      : 2016-04-08 10:51:14.898782 (320.79 ms, 473)
IO Done      : 2016-04-08 10:51:14.901772 (3.29 ms, 474)
IO Done      : 2016-04-08 10:51:14.905773 (2.84 ms, 475)
IO Done      : 2016-04-08 10:51:14.908775 (2.96 ms, 476)
Dummy Thread  : 2016-04-08 10:51:14.909777 (11 ms)

【问题讨论】:

  • 任何代码?如何将问题缩小到简短的简单脚本,这将写入预期数量的一些虚拟数据。你会看到,如果它仍然遇到同样的问题,或者运行良好。
  • @JanVlcinsky 我已经在一个脚本中复制了它,该脚本只是不断地附加到 HDFStore。我将其简化,然后在此处发布。
  • @JanVlcinsky 添加了代码示例

标签: python python-3.x pandas hdf5 pytables


【解决方案1】:

答案是否定的,这些作者不发布 GIL。请参阅文档here。我知道您实际上并没有尝试使用 multiple 线程进行写入,但这应该会提示您。当写入发生时,会持有强锁以防止多次写入。 PyTablesh5py 都将其作为 HDF5 标准的一部分。

您可以查看SWMR,尽管 pandas 不直接支持。 PyTables docs herehere 指向解决方案。这些通常涉及有一个单独的进程将数据从队列中拉出并写入。

无论如何,这通常是一种更具可扩展性的模式。

【讨论】:

  • 是的,将其拉入一个单独的进程是我能想到的避免阻塞其他线程的唯一方法。但是,我仍然不确定为什么时不时会有很大的延迟。我想它以某种方式增加了文件,但我不确定具体在做什么。我在创建时尝试为expectedrows 设置一个很大的数字,但这并没有帮助。关于在哪里看的任何想法?
  • PyTables 块写入(相对于expectedrows 计算),但我认为实际写入大小(刷新)取决于实现(例如,你不知道)。如果您对时间敏感,那么可以打赌转移到另一个进程,或者可能使用像msgpack 这样的东西,它只是直接转储到磁盘(并且是可附加的)。通常,实时捕获就是这样。您稍后再处理。
  • 我认为 HDF5 会是一个不错的选择,因为它是实时数据捕获,但我们让用户回滚历史,所以我们也需要能够读取块(会话可以是几个小时长,所以 250k/sec 加起来很快)。我想如果我能找到一种方法让它更频繁地刷新并将其移至不同的进程,事情应该没问题..我认为 GIL 没有为 IO 发布有点令人失望,但背后可能有一些逻辑决定。
  • 您正在尝试设计一个已经经过大量设计的解决方案。您正在查看 SWMR,它很新,但运行良好。
【解决方案2】:

感谢您提供工作代码。我已经修改了它以获得一些洞察力,后来创建 使用多处理的修改版本。

修改线程版本

所有的修改只是为了得到更多的信息,没有概念上的改变。合而为一 文件mthread.py 并被部分注释。

照常进口:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging

write_samples 得到了一些日志记录:

def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")

begin_io 获得最大持续时间,超过该时间会导致 WARNING 日志条目:

def begin_io(maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while True:
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug("IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")

dummy_thread 被修改为正确停止并且如果花费太长时间也会发出警告:

def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info("Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")

最后我们称之为。随意修改日志级别,WARNING 显示的次数过多, INFODEBUG 提供更多信息。

if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)

    pill2kill = threading.Event()
    t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
    t.start()
    try:
        begin_io(500)
    finally:
        pill2kill.set()
        t.join()

运行代码我得到你描述的结果:

2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133

从值可以清楚地看出,虽然begin_io 非常繁忙且延迟(可能在数据期间 正在写入磁盘),dummy_thread 也会延迟几乎相同的时间。

多处理版本 - 运行良好

我已经修改了代码以在多个进程中运行,从那时起,它真的没有阻塞 dummy_thread.

2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744

多处理的代码在这里:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging


def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")


def begin_io(pill2kill, maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while not pill2kill.wait(0):
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug( "IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")


def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info( "Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")


if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)
    pill2kill = multiprocessing.Event()
    dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
    dp.start()
    try:
        p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
        p.start()
        time.sleep(100)
    finally:
        pill2kill.set()
        dp.join()
        p.join()

结论

将数据写入 HDF5 文件确实会阻塞其他线程,并且需要多处理版本。

如果您希望 dummy_thread 做一些实际工作(例如收集数据以存储),并且您希望 将数据从这里发送到 HDF5 序列化器,您将不得不进行某种消息传递 - 要么使用 multiprocessing.QueuePipe 或可能使用 ZeroMQ(例如 PUSH - PULL 套接字 一对)。使用 ZeroMQ,您甚至可以在另一台计算机上保存数据。

编辑/警告:提供的代码有时可能无法保存数据,我做了它来测量性能并且没有使其防水。在处理过程中按 Ctrl-C 时,有时我会收到损坏的文件。我认为这个问题超出了这个问题的范围(问题应通过小心停止正在运行的进程来解决)。

【讨论】:

  • 感谢您添加更多有用的日志记录,您确实需要使用我的代码搜索输出 :) 认为我们得出了相同的结论——需要使用多处理。
  • @user3870920 您的问题帮助我更好地理解 GIL。感谢那。请注意,我的日志打印时间仅超过 500 毫秒,因此看不到较短的(通常约为 10 毫秒)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-03-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多