【问题标题】:Distribute the value of a Spark Dataframe row proportionately to other rows将 Spark Dataframe 行的值按比例分配给其他行
【发布时间】:2020-10-02 17:16:10
【问题描述】:

当我在 team 列中具有共同价值时,我必须在参与同一销售 (id_sales) 的团队之间按比例分享此共同价值。

+---------------+----------------+----------------+
|id_sales       |team            |price           |
+---------------+----------------+----------------+
|101            |Data Engineering|             200|
|102            |       Front-End|             300|
|103            |  Infrastructure|             100|
|103            |        Software|             200|
|103            |          Commum|             800|
|104            |    Data Science|             500|
+---------------+----------------+----------------+ 

例如:在上表中,我的 Common 值在 id_sales = 103 内,所以我必须计算每个团队的 Common 值多少: - 基础设施:100 - 软件:200 所以对于 Infrastructure 它是 1/3 * (800) 而对于 Software 它是 2/3 * (800) 所以最后我的表格会是这样的:

+---------------+----------------+----------------+
|id_sales       |team            |price           |
+---------------+----------------+----------------+
|101            |Data Engineering|             200|
|102            |       Front-End|             300|
|103            |  Infrastructure|          366,66|
|103            |        Software|          733,66|
|104            |    Data Science|             500|
+---------------+----------------+----------------+ 

有人可以给我一些想法或提示吗?提示可以是 python 或 scala (Spark 2.4)。

创建此表的代码:

派斯帕克

spark_df = spark.createDataFrame( \
[ \
  ("101", "Data Engineering", "200"),
  ("102", "Front-End", "300"),
  ("103", "Infrastructure", "100"),
  ("103", "Software", "200"),
  ("103", "Commum", "800"),
  ("104", "Data Science", "500") \
],
["id_sales", "team", "price"])

火花斯卡拉

val spark_df = Seq(
  ("101", "Data Engineering", "200"),
  ("102", "Front-End", "300"),
  ("103", "Infrastructure", "100"),
  ("103", "Software", "200"),
  ("103", "Commum", "800"),
  ("104", "Data Science", "500")
).toDF("id_sales", "team", "price")

谢谢:)

【问题讨论】:

    标签: python scala dataframe apache-spark


    【解决方案1】:

    试试这个:

    scala> val df = Seq(
         |   ("101", "Data Engineering", "200"),
         |   ("102", "Front-End", "300"),
         |   ("103", "Infrastructure", "100"),
         |   ("103", "Software", "200"),
         |   ("103", "Common", "800"),
         |   ("104", "Data Science", "500")
         | ).toDF("id_sales", "team", "price")
    df: org.apache.spark.sql.DataFrame = [id_sales: string, team: string ... 1 more field]
    
    scala> df.show
    +--------+----------------+-----+
    |id_sales|            team|price|
    +--------+----------------+-----+
    |     101|Data Engineering|  200|
    |     102|       Front-End|  300|
    |     103|  Infrastructure|  100|
    |     103|        Software|  200|
    |     103|          Common|  800|
    |     104|    Data Science|  500|
    +--------+----------------+-----+
    
    
    scala> val commonDF = df.filter("team='Common'")
    commonDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_sales: string, team: string ... 1 more field]
    
    scala> import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.expressions.Window
    
    scala> val ww = Window.partitionBy("id_sales")
    ww: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@43324745
    
    scala> val finalDF = df.as("main").filter("team<>'Common'").withColumn("weight",col("price")/sum("price").over(ww)).join(commonDF.as("common"), Seq("id_sales"),"left").withColumn("updated_price",when(col("common.price").isNull,df("price")).otherwise(df("price")+col("weight")*col("common.price"))).select($"id_sales",$"main.team",$"updated_price".as("price"))
    finalDF: org.apache.spark.sql.DataFrame = [id_sales: string, team: string ... 1 more field]
    
    scala> finalDF.show
    +--------+----------------+------------------+
    |id_sales|            team|             price|
    +--------+----------------+------------------+
    |     101|Data Engineering|               200|
    |     104|    Data Science|               500|
    |     102|       Front-End|               300|
    |     103|        Software| 733.3333333333333|
    |     103|  Infrastructure|366.66666666666663|
    +--------+----------------+------------------+
    

    【讨论】:

    • 成功了!!!!太感谢了!!!!只是想知道,你为什么使用我从未使用过的 Window,它的性能更好?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-20
    • 1970-01-01
    • 2010-12-27
    • 1970-01-01
    • 2023-03-04
    • 1970-01-01
    相关资源
    最近更新 更多