【问题标题】:Writing xarray multiindex data in chunks以块的形式写入 xarray 多索引数据
【发布时间】:2020-09-15 17:08:33
【问题描述】:

我正在尝试有效地重组大型多维数据集。假设随着时间的推移,我有许多遥感图像,其中有许多波段,坐标 x y 表示像素位置,时间表示图像采集时间,波段表示收集的不同数据。

在我的用例中,假设 xarray 坐标长度大致为 x (3000)、y (3000)、时间 (10),带 (40) 个浮点数据。所以 100GB 以上的数据。

我一直在尝试从 this example 工作,但我无法将其翻译成这种情况。

小数据集示例

注意:实际数据比这个例子大很多。

import numpy as np
import dask.array as da
import xarray as xr

nrows = 100
ncols = 200
row_chunks = 50
col_chunks = 50

data = da.random.random(size=(1, nrows, ncols), chunks=(1, row_chunks, col_chunks))

def create_band(data, x, y, band_name):

    return xr.DataArray(data,
                        dims=('band', 'y', 'x'),
                        coords={'band': [band_name],
                                'y': y,
                                'x': x})

def create_coords(data, left, top, celly, cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left, right, ncols) + cellx/2.0
    y = np.linspace(top, bottom, nrows) - celly/2.0
    
    return x, y

x, y = create_coords(data, 1000, 2000, 30, 30)

src = []

for time in ['t1', 't2', 't3']:

    src_t = xr.concat([create_band(data, x, y, band) for band in ['blue', 'green', 'red', 'nir']], dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})
    
    src.append(src_t)

src = xr.concat(src, dim='time')

print(src)


<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (time: 3, band: 4, y: 100, x: 200)>
dask.array<concatenate, shape=(3, 4, 100, 200), dtype=float64, chunksize=(1, 1, 50, 50), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) float64 1.015e+03 1.045e+03 1.075e+03 ... 6.985e+03 7.015e+03
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * y        (y) float64 1.985e+03 1.955e+03 1.924e+03 ... -984.7 -1.015e+03
  * time     (time) object 't1' 't2' 't3'

重组 - 堆叠和转置

我需要存储以下输出:

print(src.stack(sample=('y','x','time')).T)

<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (sample: 60000, band: 4)>
dask.array<transpose, shape=(60000, 4), dtype=float64, chunksize=(3600, 1), chunktype=numpy.ndarray>
Coordinates:
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * sample   (sample) MultiIndex
  - y        (sample) float64 1.985e+03 1.985e+03 ... -1.015e+03 -1.015e+03
  - x        (sample) float64 1.015e+03 1.015e+03 ... 7.015e+03 7.015e+03
  - time     (sample) object 't1' 't2' 't3' 't1' 't2' ... 't3' 't1' 't2' 't3'

我希望使用 dask 和 xarray 将结果分块写入磁盘,open_mfdataset 可以访问。 parquet 似乎是个不错的选择,但我不知道如何分块编写(src 太大而无法存储在内存中)。

@dask.delayed
def stacker(data):
   return data.stack(sample=('y','x','time')).T.to_pandas() 

stacker(src).to_parquet('out_*.parquet')

def stack_write(data):
   data.stack(sample=('y','x','time')).T.to_pandas().to_parquet('out_*.parquet')
   return None

stack_write(src)

在这一点上,我只是希望有一些好的想法。谢谢!

【问题讨论】:

  • src在“堆叠和转置”操作之前是否存储在内存中?
  • @Rivers 不,不是。应该由 dask 分块懒惰地完成。
  • 好的,我明白了。您是否已经成功完成了这个“堆栈和转置”操作?还是因为src太大而无法存储在内存中而无法成功完成?
  • @Rivers 没有太大而无法完成,因此存储块。
  • 谢谢。我问是因为它会对其余代码产生很大的影响(我们将无法以相同的方式进行)。你能显示你加载数据的代码行吗? (接下来几天我会很忙,所以下周我可以回答)

标签: python arrays dask parquet python-xarray


【解决方案1】:

我有一个解决方案 (https://github.com/pydata/xarray/issues/1077#issuecomment-644803374) 用于将多索引数据集写入文件。

您必须手动将数据集“编码”为可编写为 netCDF 的形式。然后在你读回来的时候“解码”。

import numpy as np
import pandas as pd
import xarray as xr


def encode_multiindex(ds, idxname):
    encoded = ds.reset_index(idxname)
    coords = dict(zip(ds.indexes[idxname].names, ds.indexes[idxname].levels))
    for coord in coords:
        encoded[coord] = coords[coord].values
    shape = [encoded.sizes[coord] for coord in coords]
    encoded[idxname] = np.ravel_multi_index(ds.indexes[idxname].codes, shape)
    encoded[idxname].attrs["compress"] = " ".join(ds.indexes[idxname].names)
    return encoded


def decode_to_multiindex(encoded, idxname):
    names = encoded[idxname].attrs["compress"].split(" ")
    shape = [encoded.sizes[dim] for dim in names]
    indices = np.unravel_index(encoded.landpoint.values, shape)
    arrays = [encoded[dim].values[index] for dim, index in zip(names, indices)]
    mindex = pd.MultiIndex.from_arrays(arrays)

    decoded = xr.Dataset({}, {idxname: mindex})
    for varname in encoded.data_vars:
        if idxname in encoded[varname].dims:
            decoded[varname] = (idxname, encoded[varname].values)
    return decoded

【讨论】:

  • 感谢您的帮助。对分块写入 hdf 有什么想法吗?
  • 除了“不要这样做”之外别无他法。并行的 HDF 很混乱。 Zarr 是一个更好的选择,它适用于并行写入和 xarray/dask。然后,您可以将 zarr 转换为 HDF。
【解决方案2】:

目前这不是解决方案,而是您的代码的一个版本,经过修改以便其他人想尝试解决此问题时可以轻松重现:

问题在于 stack 操作 (concatenated.stack(sample=('y','x','time'))。 这一步,内存不断增加,进程为killed

concatenated 对象是“后台支持”xarray.DataArray。所以我们可以期待 stack 操作由 Dask 懒惰地完成。那么,为什么进程killed在这一步呢?

这里发生的事情的两种可能性:

  • stack这个操作其实是Dask偷懒做的,但是因为数据非常庞大,即使是Dask最低需要的内存也太多了

  • stack 操作不是 Dask 支持的


import numpy as np
import dask.array as da
import xarray as xr
from numpy.random import RandomState

nrows = 20000
ncols = 20000
row_chunks = 500
col_chunks = 500


# Create a reproducible random numpy array
prng = RandomState(1234567890)
numpy_array = prng.rand(1, nrows, ncols)

data = da.from_array(numpy_array, chunks=(1, row_chunks, col_chunks))


def create_band(data, x, y, band_name):

    return xr.DataArray(data,
                        dims=('band', 'y', 'x'),
                        coords={'band': [band_name],
                                'y': y,
                                'x': x})

def create_coords(data, left, top, celly, cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left, right, ncols) + cellx/2.0
    y = np.linspace(top, bottom, nrows) - celly/2.0
    
    return x, y


x, y = create_coords(data, 1000, 2000, 30, 30)

bands = ['blue', 'green', 'red', 'nir']
times = ['t1', 't2', 't3']
bands_list = [create_band(data, x, y, band) for band in bands]

src = []

for time in times:

    src_t = xr.concat(bands_list, dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})

    src.append(src_t)


concatenated = xr.concat(src, dim='time')
print(concatenated)
# computed = concatenated.compute() # "computed" is ~35.8GB

stacked = concatenated.stack(sample=('y','x','time'))

transposed = stacked.T

可以尝试更改nrowsncols 的值以改变concatenated 的大小。为了性能,我们也可以/应该改变chunks

注意:我什至试过这个

concatenated.to_netcdf("concatenated.nc")
concatenated = xr.open_dataarray("concatenated.nc", chunks=10)

这是为了确保它是 Dask 支持的 DataArray 并且也能够调整块。 我为chunks 尝试了不同的值/s:但总是内存不足。

【讨论】:

    猜你喜欢
    • 2018-04-07
    • 2021-12-15
    • 2014-01-04
    • 2016-07-12
    • 2014-07-23
    • 2017-06-24
    • 1970-01-01
    • 2020-01-24
    • 2021-04-23
    相关资源
    最近更新 更多