【问题标题】:Spark DataFrame filtering: retain element belonging to a listSpark DataFrame过滤:保留属于列表的元素
【发布时间】:2016-02-22 20:06:06
【问题描述】:

我在 Zeppelin 笔记本上使用带有 Scala 的 Spark 1.5.1。

  • 我有一个 DataFrame,其中有一列名为 userID 的 Long 类型。
  • 我总共有大约 400 万行和 200,000 个唯一用户 ID。
  • 我还有一个包含 50,000 个用户 ID 的列表要排除。
  • 我可以轻松构建要保留的用户 ID 列表。

删除属于要排除的用户的所有行的最佳方法是什么?

问同样问题的另一种方法是:保留属于用户的行的最佳方法是什么?

我看到this post 并应用了它的解决方案(参见下面的代码),但执行速度很慢,因为我知道我在本地机器上运行 SPARK 1.5.1,我有 16GB 的良好 RAM 内存和初始DataFrame 适合内存。

这是我正在应用的代码:

import org.apache.spark.sql.functions.lit
val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))

在上面的代码中:

  • initialDataFrame 有 3885068 行,每行有 5 列,其中一列称为 userID,它包含 Long 值。
  • listOfUsersToKeep 是一个 Array[Long],它包含 150,000 个 Long 用户 ID。

我想知道是否有比我使用的更有效的解决方案。

谢谢

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql apache-zeppelin


    【解决方案1】:

    您可以使用join:

    val usersToKeep = sc.parallelize(
      listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")
    
    val finalDataFrame = usersToKeep
      .join(initialDataFrame, $"userID" === $"userID_")
      .drop("userID_")
    

    或广播变量和 UDF:

    import org.apache.spark.sql.functions.udf
    
    val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
    val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
    val finalDataFrame = initialDataFrame.where(checkUser($"userID"))
    

    应该也可以广播一个DataFrame:

    import org.apache.spark.sql.functions.broadcast
    
    initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
    

    【讨论】:

    • 我喜欢广播变量的方法!没想到!
    猜你喜欢
    • 2020-03-22
    • 1970-01-01
    • 1970-01-01
    • 2015-12-13
    • 1970-01-01
    • 2018-04-21
    • 1970-01-01
    • 2023-03-23
    • 1970-01-01
    相关资源
    最近更新 更多