【发布时间】:2016-09-05 10:15:19
【问题描述】:
在纯 Python 中执行 ETL 任务,我想为所考虑的每个原始输入文件收集错误指标和元数据(错误指标是根据文件数据部分中提供的错误代码计算的,而元数据是存储在标题中)。下面是整个过程的伪代码:
import pandas as pd
import dask
from dask import delayed
from dask import dataframe as dd
META_DATA = {} # shared resource
ERRORS = {} # shared resource
def read_file(file_name):
global META_DATA, ERRORS
# step 1: process headers
headers = read_header(file_name)
errors = {}
data_bfr = []
# step 2: process data section
for line in data_section:
content_id, data = parse_line(line)
if contains_errors(data):
errors[content_id] = get_error_code(data)
else:
data_bfr.append(content_id, data)
# ---- Part relevant for question 1 ----
# step 3: acquire lock for shared resource and write metadata
with lock.acquire():
write_metadata(file_name, headers) # stores metadata in META_DATA[file_name]
write_errors(file_name, errors) # stores error metrics in ERRORS[file_name]
return pd.DataFrame(data=data_bfr,...)
with set_options(get=dask.multiprocessing.get):
df = dd.from_delayed([delayed(read_file)(file_name) \
for file_name in os.listdir(wd)])
# ---- Part relevant for question 2 ----
df.to_hdf('data.hdf', '/data', 'w', complevel=9, \
complib='blosc',..., metadata=(META_DATA, ERRORS))
对于每个输入文件read_file 返回一个pd.DataFrame,进一步将相关元数据和错误指标写入共享资源。我正在使用dask 的多处理调度程序从延迟的read_file 操作列表中计算dask.dataframe。
-
问题 1:每个
read_file-操作都写入共享资源META_DATA和ERRORS。我必须做些什么来实施适用于dask.multiprocessing.get的正确锁定策略?从with locket.lock_file('.lock'):-context 中将元数据和错误信息写入集合是否足够?multiprocessing.RLock有效吗?我必须做些什么来初始化锁以使用dask?更根本的是,如何在dask中将META_DATA和ERRORS声明为共享资源? -
问题 2:如果可能,我想用元数据和错误指标对 HDF 数据进行注释。从question on "Collecting attributes from dask dataframe providers" 得知
dask目前不支持将元数据添加到数据帧,但是否可以将信息写入HDF?如果是,这种情况下如何处理对共享资源的访问?
【问题讨论】:
标签: python pandas locking metadata dask