【问题标题】:Spark/PySpark collect_set with a binary column带有二进制列的 Spark/PySpark collect_set
【发布时间】:2019-04-02 16:50:42
【问题描述】:

一些测试数据,有两列:第一个二进制(在本例中使用字母数字字节),第二个整数:

from pyspark.sql.types import *
from pyspark.sql import functions as F

df = spark.createDataFrame([
    (bytearray(b'0001'), 1),
    (bytearray(b'0001'), 1),
    (bytearray(b'0001'), 2),
    (bytearray(b'0002'), 2)
],
schema=StructType([
    StructField("bin", BinaryType()),
    StructField("number", IntegerType())
]))

使用 collect_set 按整数列分组然后删除重复项不起作用,因为字节数组不支持散列。因此:

(
    df
    .groupBy('number')
    .agg(F.collect_set("bin").alias('bin_array'))
    .show()
)

+------+------------+
|number|   bin_array|
+------+------------+
|     1|[0001, 0001]|
|     2|[0001, 0002]|
+------+------------+

一个 hacky 选项是将二进制数组嵌入到结构中,然后再将它们全部解包,但我怀疑这会导致大量分配并且非常昂贵(虽然实际上并没有对其进行分析):

def unstruct_array(input):
    return [x.bin for x in input]

unstruct_array_udf = F.udf(unstruct_array, ArrayType(BinaryType()))

(
    df
    .withColumn("bin", F.struct("bin"))
    .groupBy('number')
    .agg(F.collect_set("bin").alias('bin_array'))
    .withColumn('bin_array', unstruct_array_udf('bin_array'))
    .show()
)

+------+------------+                                                           
|number|   bin_array|
+------+------------+
|     1|      [0001]|
|     2|[0001, 0002]|
+------+------------+

如果我围绕二进制类型和 Spark 尝试了很多 Google 搜索词,有各种答案说如果你需要散列,你应该包装数组。建议包括自定义包装器或通过调用创建 Scala WrappedArray 的 Scala 的 toSeq。例如:

ReduceByKey with a byte array as the key

How to use byte array as key in RDD?

因此,选项包括:

  1. 映射底层 RDD 以使二进制字段成为 WrappedArray。不知道如何在 Python 中做到这一点?
  2. 为数组创建一个 Python 包装器,然后以某种方式在 Python 中散列底层 Java 数组?虽然不确定这比使用结构有什么优势?
  3. 我可以包装在一个结构中,然后从不打开包装,这样处理效率会更高一些,但可能会使 parquet 文件更大,并且在所有下游任务中解析成本更高

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    这是一个可能比包装和展开更有效的技巧。您可以简单地事先调用distinct 方法。

    df.show()
    +-------------+------+
    |          bin|number|
    +-------------+------+
    |[30 30 30 31]|     1|
    |[30 30 30 31]|     1|
    |[30 30 30 31]|     2|
    |[30 30 30 32]|     2|
    +-------------+------+
    
    df.distinct().show()
    +-------------+------+
    |          bin|number|
    +-------------+------+
    |[30 30 30 31]|     1|
    |[30 30 30 31]|     2|
    |[30 30 30 32]|     2|
    +-------------+------+
    

    请注意,我可能没有使用与您相同的 Spark 版本(我的是 2.2.1),因为二进制数组的显示似乎有所不同。

    那么,对于collect_set,它简单地归结为:

    df.distinct().groupBy("number").agg(F.collect_set("bin"))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-10-01
      • 1970-01-01
      • 2021-05-02
      • 1970-01-01
      • 2019-05-06
      • 1970-01-01
      • 2021-12-03
      相关资源
      最近更新 更多