【问题标题】:Spark get all rows with same values in array in columnSpark获取列中数组中具有相同值的所有行
【发布时间】:2020-11-05 19:00:13
【问题描述】:

我有一个包含 idhashes 列的 Spark 数据框,其中 hashes 列包含长度为 n 的整数值的 Seq。示例:

+----+--------------------+
+  id|              hashes|
+----+--------------------+
|0   |     [1, 2, 3, 4, 5]|
|1   |     [1, 5, 3, 7, 9]|
|2   |     [9, 3, 6, 8, 0]|
+-------------------------+

我想获得一个数据框,其中包含hashes 中的数组至少在一个位置匹配的所有行。更正式地说,我想要一个带有额外列 matches 的数据框,对于每一行 r 包含 Seqids 行,其中 hashes[r][i] == hashes[k][i]k 是至少一个值的任何其他行i.

对于我的示例数据,结果将是:

+---+---------------+-------+
|id |hashes         |matches|
+---+---------------+-------+
|0  |[1, 2, 3, 4, 5]|[1]    |
|1  |[1, 5, 3, 7, 9]|[0]    |
|2  |[9, 3, 6, 8, 0]|[]     |
+---+---------------+-------+

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    在 Spark 3 中,以下代码在行之间比较数组,只保留两个数组在同一位置共享至少一个元素的行。 df 是您的输入数据框:

        df.join(
          df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
          exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
        )
          .groupBy("id")
          .agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
          .withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))
    

    详细说明

    首先,我们执行自动交叉连接,根据您在两个哈希数组上的相同位置至少有一个元素的条件进行过滤。

    为了构建条件,我们压缩了两个哈希数组,一个来自第一个数据帧,一个用于第二个连接的数据帧,即第一个重命名列的数据帧。通过压缩,我们得到一个{"hashes":x, "hashes2":y} 的数组,接下来我们只需要检查这个数组中是否存在x = y 的元素。完整的条件写成如下:

    exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
    

    然后,我们将按id 列聚合以收集所有保留的行的id2,即符合您条件的行

    为了保持“哈希”列,对于具有相同“id”的两行,“哈希”列相等,我们为每个“id”获取第一次出现的“哈希”。我们使用collect_list收集所有“id2”:

    .agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))
    

    最后,我们从“匹配”列中过滤掉当前行的id

    .withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))
    

    如果您需要“id”按顺序排列,可以添加orderBy 子句:

    .orderBy("id")
    

    运行

    使用包含以下值的数据框 df

    +---+---------------+
    |id |hashes         |
    +---+---------------+
    |0  |[1, 2, 3, 4, 5]|
    |1  |[1, 5, 3, 7, 9]|
    |2  |[9, 3, 6, 8, 0]|
    +---+---------------+
    

    你会得到以下输出:

    +---+---------------+-------+
    |id |hashes         |matches|
    +---+---------------+-------+
    |0  |[1, 2, 3, 4, 5]|[1]    |
    |1  |[1, 5, 3, 7, 9]|[0]    |
    |2  |[9, 3, 6, 8, 0]|[]     |
    +---+---------------+-------+
    

    限制

    join 是笛卡尔积,非常昂贵。虽然条件过滤了结果,但它会导致在大数据集上进行大量的计算/洗牌,并且可能性能很差。

    如果你使用的是3.0之前版本的Spark,你必须将一些build-in spark functions替换为user-defined functions

    【讨论】:

    • 您好,我是 spark 新手,我正在使用
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-27
    • 1970-01-01
    • 2017-11-07
    • 1970-01-01
    • 1970-01-01
    • 2020-06-19
    相关资源
    最近更新 更多