【问题标题】:Locking in dask.multiprocessing.get and adding metadata to HDF锁定 dask.multiprocessing.get 并将元数据添加到 HDF
【发布时间】:2016-09-05 10:15:19
【问题描述】:

在纯 Python 中执行 ETL 任务,我想为所考虑的每个原始输入文件收集错误指标和元数据(错误指标是根据文件数据部分中提供的错误代码计算的,而元数据是存储在标题中)。下面是整个过程的伪代码:

import pandas as pd
import dask
from dask import delayed
from dask import dataframe as dd

META_DATA = {}  # shared resource
ERRORS = {}  # shared resource

def read_file(file_name):
    global META_DATA, ERRORS

    # step 1: process headers
    headers = read_header(file_name)
    errors = {}
    data_bfr = []

    # step 2: process data section
    for line in data_section:
        content_id, data = parse_line(line)
        if contains_errors(data):
            errors[content_id] = get_error_code(data)
        else:
            data_bfr.append(content_id, data)

    # ---- Part relevant for question 1 ----
    # step 3: acquire lock for shared resource and write metadata
    with lock.acquire():
        write_metadata(file_name, headers)  # stores metadata in META_DATA[file_name]
        write_errors(file_name, errors)  # stores error metrics in ERRORS[file_name]

    return pd.DataFrame(data=data_bfr,...)

with set_options(get=dask.multiprocessing.get):
    df = dd.from_delayed([delayed(read_file)(file_name) \
                          for file_name in os.listdir(wd)])

    # ---- Part relevant for question 2 ----
    df.to_hdf('data.hdf', '/data', 'w', complevel=9, \
        complib='blosc',..., metadata=(META_DATA, ERRORS))

对于每个输入文件read_file 返回一个pd.DataFrame,进一步将相关元数据和错误指标写入共享资源。我正在使用dask 的多处理调度程序从延迟的read_file 操作列表中计算dask.dataframe

  • 问题 1:每个read_file-操作都写入共享资源META_DATAERRORS。我必须做些什么来实施适用于dask.multiprocessing.get 的正确锁定策略?从with locket.lock_file('.lock'):-context 中将元数据和错误信息写入集合是否足够? multiprocessing.RLock 有效吗?我必须做些什么来初始化锁以使用dask?更根本的是,如何在dask 中将META_DATAERRORS 声明为共享资源?
  • 问题 2:如果可能,我想用元数据和错误指标对 HDF 数据进行注释。从question on "Collecting attributes from dask dataframe providers" 得知dask 目前不支持将元数据添加到数据帧,但是否可以将信息写入HDF?如果是,这种情况下如何处理对共享资源的访问?

【问题讨论】:

    标签: python pandas locking metadata dask


    【解决方案1】:

    不要依赖全局变量

    Dask 最适合使用 pure functions

    特别是,您的案例是 Python 中的一个限制,因为它(正确地)不会在进程之间共享全局数据。相反,我建议您从函数中显式返回数据:

    def read_file(file_name):
        ...
        return df, metadata, errors
    
    values = [delayed(read_file)(fn) for fn in filenames]
    dfs      = [v[0] for v in values]
    metadata = [v[1] for v in values]
    errors   = [v[2] for v in values]
    
    df = dd.from_delayed(dfs)
    
    import toolz
    metadata = delayed(toolz.merge)(metadata)
    errors = delayed(toolz.merge)(errors)
    

    【讨论】:

    • 在这种情况下,zip 抱怨参数 #1 不可迭代:TypeError: zip argument #1 must support iteration。你的食谱一般有效吗?我的read_file 返回一个熊猫DataFrame、一个dict 和一个list
    • 确实,我的例子有缺陷。现已修复
    猜你喜欢
    • 1970-01-01
    • 2018-12-01
    • 1970-01-01
    • 2013-09-01
    • 1970-01-01
    • 2016-10-09
    • 1970-01-01
    • 2013-12-12
    • 1970-01-01
    相关资源
    最近更新 更多