【问题标题】:Apache Spark - Adding a incremental Id in based on a column valueApache Spark - 根据列值添加增量 ID
【发布时间】:2020-05-21 14:56:54
【问题描述】:

我想根据列值创建一个增量 ID。

例如,如果我有下表

-----------------------
| id |   value    |
-----------------------
| 3  |    a       |
| 2  |    a       | 
| 1  |    b       |
| 4  |    b       |
| 5  |    c       |
-----------------------

我想创建一个带有随机或增量标识符的新列,该标识符对于列值是唯一的,如下所示:

-----------------------------------------------
| id |   value    |    new_id    |
-----------------------------------------------
| 3  |    a       |     1        | 
| 2  |    a       |     1        |
| 1  |    b       |     2        |
| 4  |    b       |     2        |
| 5  |    c       |     3        |
-----------------------------------------------

除了使用 distinct 并稍后加入之外,还有其他选择吗?

谢谢!

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    您可以在 Window 中使用 dense_rank() 并按值排序,但是这会将所有数据移动到单个分区,因此对于大型 DataSet 的性能会很差。

    val window =   Window.orderBy($"value")
    
    df.withColumn("new_id", dense_rank.over(window))
    

    编辑 - 似乎使用虚拟分区可确保将数据打乱到 spark.sql.shuffle.partitions 分区中

    val window =   Window.partitionBy(lit(0)).orderBy($"value")
    

    【讨论】:

      【解决方案2】:

      一种直接的方法是获取列值的哈希值。这应该是一个无冲突的散列,并且会阻止对整个数据集的扫描。

      使用斯卡拉,

      val sparkSession = ???
      import sparkSession.implicits._
      import org.apache.spark.sql.functions._
      
      val df = ???
      val dfModified = df.withColumn("new_id", hash(col("value")).cast("string"))
      

      注意,如果你有一组固定的已知值,那么你应该事先创建一个value -> id 映射并使用广播连接或UDF 来放置新的ID。如果你不使用 scala,方法是一样的,你只需要使用不同的散列技术。

      希望这会有所帮助,干杯。

      【讨论】:

      • 这符合要求,但 UDF 很慢,如果可能最好避免使用。
      • 不,它们不是,它们基本上是map 操作。
      • 这篇文章只是提到了一个众所周知的事实,即 UDF 大多无法优化。在这种情况下没关系,因为没有复杂的转换,所以有一个string => map 转换。尽管如此,我还是更新了使用内置 hash 函数的答案,该函数再次使用相同的实现。
      猜你喜欢
      • 2016-11-17
      • 1970-01-01
      • 2022-11-18
      • 2020-08-23
      • 1970-01-01
      • 1970-01-01
      • 2016-12-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多