【问题标题】:Reading an hdf5 file only after it has completely finished acquiring data只有在完全完成数据采集后才读取 hdf5 文件
【发布时间】:2021-09-13 21:53:07
【问题描述】:

数据将保存到 hdf5 文件中,但一个文件的保存总共需要大约 30 秒。一旦数据完成保存在一个 hdf5 文件中,该文件将立即使用,直到下一个 hdf5 文件完成,该过程将继续如此。有没有一种简单的方法来检查 hd5 文件是否已完成加载,然后才能使用它? hdf5 文件大约 10-20MB 并且都将保存在同一个文件夹中。当然,我或许可以将某个计时器设置为 30 秒以上,但我有兴趣保持时间尽可能短,这意味着我需要准确知道每个 hdf5 文件何时完成数据采集。

我有几个想法:

  1. 测量文件大小从一个时间点到另一个时间点的差异。如果没有变化,则假定文件已加载完毕。
  2. 我对 hdf5 文件了解不多,但也许每个 hdf5 文件的末尾都有一些东西,而且只是在末尾。如果是这种情况,我可以继续检查最后一个组件的值是否存在。如果存在,则该文件必须完成。

有什么想法吗?如果有任何帮助,我将不胜感激。

编辑: 我对 on_created 内部 hdf5 部分的想法:

class CustomHandler(FileSystemEventHandler):    

    def __init__(self, callback: Callable):
        self.callback = callback

        # Store callback to be called on every on_created event

    def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
        #print(f"Event type: {event.event_type}\nAt: {event.src_path}\n")

        # check if it's File creation, not Directory creation
        if isinstance(event, FileCreatedEvent):
            file = pathlib.Path(event.src_path)

            #print(f"Processing file {file.name}\n")

            # call callback
            #self.callback(file)

            wait = 3
            max_wait = 30
            waited = 0

            while True:
                try:
                    h5py.File(self.callback(file), 'r')
                    return self.callback(file)

                except FileNotFoundError:
                    print('Error: HDF5 File not found')
                    return None

                except OSError:
                    if waited < max_wait:
                        print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
                        time.sleep(wait)
                        waited += wait
                    else:
                        print(f'waited too long= {waited} secs')
                        return None

【问题讨论】:

  • hd5 文件是如何写入的?通过外部程序、您可以控制的程序还是同一个程序?
  • 另外,每个文件是由不同的程序/进程编写的吗?还是同一个程序?
  • 这是一个写入hdf5文件的内部程序,但是这个程序和我要使用的程序之间应该没有通信。内部程序完成写入文件后,只能使用 hdf5 文件。每个文件都是由同一个程序编写的。

标签: python hdf5


【解决方案1】:

根据您的 cmets 和我们的讨论,最简单的实现可能是“等待”文件但不返回 h5py 文件对象的函数。这样您仍然可以使用标准的上下文管理器:(例如,with h5py.File() as h5f:)并避免在主程序中关闭文件。

我将修改后的函数发布为新答案(重命名为h5_wait)以避免混淆(我的第一个答案具有原始函数h5_open_wait)。此函数类似,但返回 True/False 标志而不是 h5py 文件对象。它通过调用h5py.File() 检查文件状态,然后在退出函数之前关闭。它还使用sys.argv 来获取HDF5 文件名(如sys.argv[1])。

查看下面的新代码:

import h5py
import sys
import time

def h5_wait(h5file):
    
    wait = 3
    max_wait = 30
    waited = 0

    while True:
        try:
            h5f = h5py.File(h5file,'r')
            break
                
        except FileNotFoundError:
            print('\nError: HDF5 File not found\n')
            return False
        
        except OSError:   
            if waited < max_wait:
                print(f'Warning: HDF5 File locked, sleeping {wait} seconds...')
                time.sleep(wait) 
                waited += wait  
            else:
                print(f'\nWaited too long= {waited} secs, exiting...\n')
                return False

    h5f.close()
    return True

####################

if len(sys.argv) != 2:
    sys.exit('Include HDF5 file name on command line.')
h5file = sys.argv[1]         

h5stat = h5_wait(h5file)
if h5stat is False:
    sys.exit('Error: HDF5 File not available')
    
with h5py.File(h5file) as h5f:
    # do something with the file      
    start = time.time()
    for ds, obj in h5f.items():
        print(f'ds name={ds}; shape={obj.shape}')
      
    print(f'\nTime to read {len(list(h5f.keys()))} datasets = {time.time()-start:.2f} secs')  

【讨论】:

  • sys.argv 如何检索新文件?文件将每 30 秒进来一次,所以我看不到 sys.argv 如何检查新填充的 h5 文件。
  • 它没有。 :-) 它们的值在程序启动时传递。当你的“看门狗程序”找到新文件时,它是如何将新文件的名称传递给下一个程序的?
【解决方案2】:

您想要的是“文件锁定”。好消息:在 HDF5 库构建中启用(默认情况下)。而且,更好的是,它在 h5py 包中启用!因此,如果您尝试打开一个已打开以供其他程序写入的文件,您将遇到异常。我们可以利用这个例外来发挥我们的优势。挑战在于将文件锁定异常与其他潜在的文件打开异常(如文件不存在)区分开来。

坦率地说,我更喜欢 Python 的 with/as: 上下文管理器来打开文件。但是,它以相同的方式处理所有异常(不打开并退出)。因此,我们需要一种方法来以不同的方式处理不同的异常。我怀疑自定义文件上下文管理器是最 Pythonic 的方法。但是,这超出了我的专业范围。

相反,我写了一个你用文件名调用的函数它使用try/except:while 循环中打开文件。将发生以下三件事之一:

  1. 如果打开文件,则返回 h5py 文件对象。
  2. 如果文件不存在,它会立即返回None
  3. 如果它被锁定,它会休眠,然后重试。如果超过时限打不开,则返回None。

使用这个函数的时候记得使用.close()方法!

代码于 2021 年 9 月 9 日更新,使用 argparse 模块将 HDF5 文件名作为必需的命令行参数传递。
以下更新代码:

import h5py
import argparse
import sys 
import time

def h5_open_wait(h5file):
    
    wait = 3
    max_wait = 30
    waited = 0

    while True:
        try:
            h5f = h5py.File(h5file,'r')
            return h5f
                
        except FileNotFoundError:
            print('Error: HDF5 File not found')
            return None
        
        except OSError:   
            if waited < max_wait:
                print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
                time.sleep(wait) 
                waited += wait  
            else:
                print(f'waited too long= {waited} secs')
                return None

def get_job_options():

# Note that HDF5 file name is only parameter and is required; 
 
    parser = argparse.ArgumentParser(description='Check HDF5 file is available to open.')
    parser.add_argument('hdf5', help='HDF5 filename (Required)' )

    if len(sys.argv)==1:
    # display help message when no args are passed.
        parser.print_help()
        sys.exit('Error: No HDF5 file name specified; exiting.')

    args = parser.parse_args()
    
    HDF5_FILE = args.hdf5
    #print ('HDF5 file = %s' % args.hdf5)

    return (HDF5_FILE)

####################

h5file  = get_job_options()

start = time.time()

h5f = h5_open_wait(h5file)
if h5f is None:
    sys.exit('Error: HDF5 File not opened')
    
# do something with the file      
for ds, obj in h5f.items():
    print(f'ds name={ds}; shape={obj.shape}')

h5f.close()     
print(f'\nTime to read all datasets = {time.time()-start:.2f} secs')  

为了测试,我编写了一个简单的程序,从一个大数组中创建 800 个数据集。 (下面的代码。)要测试,首先启动它,然后运行上面的代码,看看它是如何等待的。根据您的系统速度调整上方的max_wait 和下方的a0cnt

创建上述示例文件的代码:

start = time.time()
a0 = 1000
cnt = 800
arr = np.random.random(a0*a0).reshape(a0,a0)
with h5py.File('SO_69067142.h5','w') as h5f:
    for dcnt in range(cnt):
        h5f.create_dataset(f'ds_{dcnt:03}',data=arr)

print(f'Time to create {cnt} datasets={time.time()-start:.2f}')   

【讨论】:

  • 感谢您的回答!我还需要该函数以某种方式检查/选择文件夹中的下一个最新文件,以便检查 A.hdf5 是否完成写入是一个平稳的连续过程。当是时,文件被返回并在其他地方使用。然后程序会检查文件夹中是否有更新的文件。如果它找到了一个较新的文件,B.hdf5,那么进程继续查看它是否完成了 B.hdf5 的写入。但它应该只检查是否添加了新文件而不检查旧文件。你知道这样做的方法吗?这是我的问题的一部分,我也无法解决。
  • 这个过程是一个完全不同的问题(而且更复杂)。 :-) 如果我理解,您可能有:A.hdf5B.hdf5C.hdf5D.hdf5 等。如果它们在程序启动时都存在,那就相对简单了。创建文件名字典并对其进行迭代,直到所有文件名都已处理完毕。但是,如果程序启动时它们不存在,您将需要另一个循环来检查新文件并将它们添加到字典中进行处理。这并不难,但很棘手。另外,您如何知道何时完成(何时停止检查新文件)?
  • 是的,新文件将随时间填充,并将一个一个填充。这意味着如果当前正在检查 F.hdf5 是否已完成,则只有 G.hdf5 将在 F.hdf5 之后的时间窗口内填充,而不会另外填充 H.hdf5 I.hdf5 J.hdf5 等。有字典的问题在所有这些文件中,其中可能会有 10000+ hdf5 文件。我不认为这对优化有好处,因为它必须验证新文件是否已经在字典中。
  • 理论上它应该永远运行。这只是程序的一小部分。一旦主程序启动,它应该开始搜索。当主程序结束时,这个搜索程序也将结束。
  • 您可能需要重新考虑您的方法。使用基于进程/任务状态的过程,而不是基于文件的过程。启动第一个进程(创建 HDF5 文件),然后在第一个进程完成后,启动第二个进程。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-23
  • 2015-04-17
  • 1970-01-01
  • 1970-01-01
  • 2012-02-12
  • 2016-09-06
相关资源
最近更新 更多