【问题标题】:How to map a dask Series with a large dict如何用大字典映射 dask 系列
【发布时间】:2018-06-01 07:56:58
【问题描述】:

我正在尝试找出使用大型映射映射 dask 系列的最佳方法。直截了当的series.map(large_mapping) 问题UserWarning: Large object of size <X> MB detected in task graph 并建议使用client.scatterclient.submit,但后者并不能解决问题,实际上速度要慢得多。在client.scatter 中尝试broadcast=True 也无济于事。

import argparse
import distributed
import dask.dataframe as dd

import numpy as np
import pandas as pd


def compute(s_size, m_size, npartitions, scatter, broadcast, missing_percent=0.1, seed=1):
    np.random.seed(seed)
    mapping = dict(zip(np.arange(m_size), np.random.random(size=m_size)))
    ps = pd.Series(np.random.randint((1 + missing_percent) * m_size, size=s_size))
    ds = dd.from_pandas(ps, npartitions=npartitions)
    if scatter:
        mapping_futures = client.scatter(mapping, broadcast=broadcast)
        future = client.submit(ds.map, mapping_futures)
        return future.result()
    else:
        return ds.map(mapping)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', default=200000, type=int, help='series size')
    parser.add_argument('-m', default=50000, type=int, help='mapping size')
    parser.add_argument('-p', default=10, type=int, help='partitions number')
    parser.add_argument('--scatter', action='store_true', help='Scatter mapping')
    parser.add_argument('--broadcast', action='store_true', help='Broadcast mapping')
    args = parser.parse_args()

    client = distributed.Client()
    ds = compute(args.s, args.m, args.p, args.scatter, args.broadcast)
    print(ds.compute().describe())

【问题讨论】:

    标签: python dask dask-distributed


    【解决方案1】:

    你的问题来了

    In [4]: mapping = dict(zip(np.arange(50000), np.random.random(size=50000)))
    
    In [5]: import pickle
    
    In [6]: %time len(pickle.dumps(mapping))
    CPU times: user 2.24 s, sys: 18.6 ms, total: 2.26 s
    Wall time: 2.25 s
    Out[6]: 6268809
    

    所以mapping 很大且未分区 - 在这种情况下,分散操作是给您带来问题的操作。

    考虑替代方案

    def make_mapping():
        return dict(zip(np.arange(50000), np.random.random(size=50000)))
    
    mapping = client.submit(make_mapping)  # ships the function, not the data
                                           # and requires no serialisation
    future = client.submit(ds.map, mapping)
    

    这不会显示警告。但是,在这里使用字典来做映射对我来说似乎很奇怪,一系列直列数组似乎更好地编码了数据的性质。

    【讨论】:

    • 谢谢,我最终做了类似的事情。发布的示例代码仅用于说明,在实际用例中,映射计算在每个工作人员中重新计算更加复杂和昂贵。所以我只计算一次,腌制它,然后让工人解开它。我发现的一个额外优化是使用distributed.worker.thread_state 来缓存同一进程中所有多线程工作人员的未腌制映射。
    猜你喜欢
    • 2017-02-22
    • 2019-09-28
    • 2017-03-06
    • 2020-09-25
    • 1970-01-01
    • 2021-12-27
    • 2015-06-08
    • 1970-01-01
    • 2020-03-15
    相关资源
    最近更新 更多