【发布时间】:2019-04-02 16:50:42
【问题描述】:
一些测试数据,有两列:第一个二进制(在本例中使用字母数字字节),第二个整数:
from pyspark.sql.types import *
from pyspark.sql import functions as F
df = spark.createDataFrame([
(bytearray(b'0001'), 1),
(bytearray(b'0001'), 1),
(bytearray(b'0001'), 2),
(bytearray(b'0002'), 2)
],
schema=StructType([
StructField("bin", BinaryType()),
StructField("number", IntegerType())
]))
使用 collect_set 按整数列分组然后删除重复项不起作用,因为字节数组不支持散列。因此:
(
df
.groupBy('number')
.agg(F.collect_set("bin").alias('bin_array'))
.show()
)
+------+------------+
|number| bin_array|
+------+------------+
| 1|[0001, 0001]|
| 2|[0001, 0002]|
+------+------------+
一个 hacky 选项是将二进制数组嵌入到结构中,然后再将它们全部解包,但我怀疑这会导致大量分配并且非常昂贵(虽然实际上并没有对其进行分析):
def unstruct_array(input):
return [x.bin for x in input]
unstruct_array_udf = F.udf(unstruct_array, ArrayType(BinaryType()))
(
df
.withColumn("bin", F.struct("bin"))
.groupBy('number')
.agg(F.collect_set("bin").alias('bin_array'))
.withColumn('bin_array', unstruct_array_udf('bin_array'))
.show()
)
+------+------------+
|number| bin_array|
+------+------------+
| 1| [0001]|
| 2|[0001, 0002]|
+------+------------+
如果我围绕二进制类型和 Spark 尝试了很多 Google 搜索词,有各种答案说如果你需要散列,你应该包装数组。建议包括自定义包装器或通过调用创建 Scala WrappedArray 的 Scala 的 toSeq。例如:
ReduceByKey with a byte array as the key
How to use byte array as key in RDD?
因此,选项包括:
- 映射底层 RDD 以使二进制字段成为 WrappedArray。不知道如何在 Python 中做到这一点?
- 为数组创建一个 Python 包装器,然后以某种方式在 Python 中散列底层 Java 数组?虽然不确定这比使用结构有什么优势?
- 我可以包装在一个结构中,然后从不打开包装,这样处理效率会更高一些,但可能会使 parquet 文件更大,并且在所有下游任务中解析成本更高
【问题讨论】:
标签: python apache-spark pyspark