【问题标题】:What is the fastest way to get distinct rows in pyarrow table?在 pyarrow 表中获取不同行的最快方法是什么?
【发布时间】:2021-06-21 13:10:51
【问题描述】:

我正在尝试获取有关我的 pyarrow 表中两列中值的不同组合的信息。

我目前正在做的是:

import pandas as pd
import pyarrow as pa
my_table = pa.Table.from_pandas(
  pd.DataFrame(
    {
      'col1':['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b'], 
      'col2':[1,1,2,2,1,1,2,3],
      'col3':[1,2,3,4,5,6,7,8]
    }
  )
)
a = [i.to_numpy().astype('str') for i in my_table.select(['col1', 'col2']).columns]
unique = np.unique(np.array(a), axis = 1)

返回预期结果:

unique
>array([['a', 'a', 'b', 'b', 'b'],
       ['1', '2', '1', '2', '3']], dtype='<U21')

但是对于较大的表来说这很慢,我希望有更快的方法吗?

另外,我真正想知道的是,当我尝试编写分区数据集时,如何提前知道它将写入哪些目录(即哪些分区在我的表中有一些数据)

编辑:

它可以更快地转换为 pandas 而不是多个 numpy 数组,然后使用drop_duplicates()

my_table.select(['col1', 'col2']).to_pandas().drop_duplicates()

【问题讨论】:

  • 确定列组合的唯一性(可以用箭头术语表示为 StructArray)尚未在 Arrow 中实现。所以短期内,我认为转换为 pandas 并使用 drop_duplicates() 是你最好的选择。
  • 感谢您的回复!是否有任何已知的实施计划?票,时间表?
  • 普通票在issues.apache.org/jira/browse/ARROW-3978,没有具体的时间表,但希望在接下来的几个月里(groupby 的工作正在进行中,这也需要能够在多个列上分组)跨度>

标签: python numpy pyarrow apache-arrow


【解决方案1】:

https://issues.apache.org/jira/browse/ARROW-3978 跟踪对直接编码结构的支持

同时,这里有一个解决方法,它在计算上类似于 pandas 的独特功能,但通过使用 pyarrow 自己的计算内核来避免转换为 pandas 的成本。

import pyarrow as pa
import pyarrow.compute as pc


def _dictionary_and_indices(column):
    assert isinstance(column, pa.ChunkedArray)

    if not isinstance(column.type, pa.DictionaryType):
        column = pc.dictionary_encode(column, null_encoding_behavior='encode')

    dictionary = column.chunk(0).dictionary
    indices = pa.chunked_array([c.indices for c in column.chunks])

    if indices.null_count != 0:
        # We need nulls to be in the dictionary so that indices can be
        # meaningfully multiplied, so we must round trip through decoded
        column = pc.take(dictionary, indices)
        return _dictionary_and_indices(column)

    return dictionary, indices


def unique(table):
    "produce a table containing only the unique rows from the input"
    if table.num_columns == 0:
        return None

    table = table.unify_dictionaries()

    dictionaries = []
    fused_indices = None

    for c in table.columns:
        dictionary, indices = _dictionary_and_indices(c)

        if fused_indices is None:
            fused_indices = indices
        else:
            # pack column's indices into fused_indices
            fused_indices = pc.add(
                pc.multiply(fused_indices, len(dictionary)),
                indices)

        dictionaries.append(dictionary)

    uniques = []

    # pc.unique can now be invoked on the single array of fused_indices
    fused_indices = pc.unique(fused_indices)

    for dictionary in reversed(dictionaries):
        # unpack the column's indices from fused_indices
        quotient = pc.divide(fused_indices, len(dictionary))
        remainder = pc.subtract(fused_indices,
                                pc.multiply(quotient, len(dictionary)))

        # decode this column's uniques
        uniques.insert(0, pc.take(dictionary, remainder))
        fused_indices = quotient

    return pa.Table.from_arrays(uniques, names=table.column_names)


if __name__ == '__main__':
    my_table = pa.table({
        'col1': ['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b'],
        'col2': [1,   1,   2,   2,   1,   1,   2,   3],
        'col3': [1,   2,   3,   4,   5,   6,   7,   8],
    })

    assert unique(my_table.select(['col1', 'col2'])).equals(pa.table({
        'col1': ['a', 'a', 'b', 'b', 'b'],
        'col2': [1,   2,   1,   2,   3],
    }))

【讨论】:

  • 小记,此代码仅适用于pyarrow开发版(4月发布为pyarrow 4.0)
  • 哈,已经有很多理由期待 4.0 了! :)
  • github.com/TomScheffers/pyarrow_ops/blob/main/pyarrow_ops/… 发现这个库也具有用于 pyarrow 表的 drop_duplicates 函数。我需要记住对这两种方法进行基准测试。
猜你喜欢
  • 2020-01-16
  • 2019-04-08
  • 1970-01-01
  • 2011-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-14
相关资源
最近更新 更多