【问题标题】:Spark - Iterating through all rows in dataframe comparing multiple columns for each row against anotherSpark - 遍历数据框中的所有行,将每一行的多列与另一行进行比较
【发布时间】:2017-07-31 15:20:09
【问题描述】:
| match_id | player_id | team | win |
|    0     |      1    |   A  |  A  |
|    0     |      2    |   A  |  A  |
|    0     |      3    |   B  |  A  |
|    0     |      4    |   B  |  A  |
|    1     |      1    |   A  |  B  |
|    1     |      4    |   A  |  B  |
|    1     |      8    |   B  |  B  |
|    1     |      9    |   B  |  B  |
|    2     |      8    |   A  |  A  |
|    2     |      4    |   A  |  A  |
|    2     |      3    |   B  |  A  |
|    2     |      2    |   B  |  A  |

我有一个如上所示的数据框。

我需要创建一个映射(键,值)对,以便为每个

(k=>(player_id_1, player_id_2), v=> 1 ), 如果 player_id_1 在比赛中战胜 player_id_2

(k=>(player_id_1, player_id_2), v=> 0 ),如果 player_id_1 在比赛中输给 player_id_2

因此,我将不得不遍历整个数据框,根据其他 3 列将每个玩家 id 与另一个进行比较。

我计划如下实现。

  1. 按 match_id 分组

  2. 在每个组中为一个 player_id 检查其他 player_id 的以下内容

    一个。如果 match_id 相同而团队不同 那么

         if team =  win
           (k=>(player_id_1, player_id_2), v=> 0 )
         else team != win
           (k=>(player_id_1, player_id_2), v=> 1 )
    

例如,在按匹配分区后考虑匹配 1。 player_id 1 需要与 player_id 2,3 和 4 进行比较。 迭代时,将跳过 player_id 2 的记录,因为团队相同 对于 player_id 3,因为团队不同,将比较团队和胜利。 由于 player_id 1 在 A 队,而 player_id 3 在 B 队,A 队获胜,形成的键值将是

((1,3),1)

我对如何在命令式编程中实现这一点有一个很好的想法,但我对 scala 和函数式编程真的很陌生,并且不知道如何在遍历字段的每一行时创建一个(键,值) 通过检查其他字段来配对。

我已尽力解释问题。如果我的问题的任何部分不清楚,请告诉我。我很乐意解释一下。谢谢你。

PS:我使用的是 Spark 1.6

【问题讨论】:

  • 您生成((1,3),1) 的情况也会生成((2,4),1) 还是您只想完全跳过第二个玩家ID?
  • 这只是一个内部或自连接。
  • @philantrovert 是的,我也需要 ((2,4),1)。这是对所有 player_id 进行的。
  • @RamGhadiyaram 谢谢。我使用了 rogue-one 的解决方案,使用了提到的内部连接和条件并让它工作。

标签: scala apache-spark spark-dataframe


【解决方案1】:

这可以使用 DataFrame API 来实现,如下所示..

Dataframe API 版本

val df = Seq((0,1,"A","A"),(0,2,"A","A"),(0,3,"B","A"),(0,4,"B","A"),(1,1,"A","B"),(1,4,"A","B"),(1,8,"B","B"),(1,9,"B","B"),(2,8,"A","A"),(2,4,"A","A"),(2,3,"B","A"),(2,2,"B","A")
).toDF("match_id", "player_id", "team", "win")

val result = df.alias("left")
       .join(df.alias("right"), $"left.match_id" === $"right.match_id" && not($"right.team" === $"left.team"))
       .select($"left.player_id", $"right.player_id", when($"left.team" === $"left.win", 1).otherwise(0).alias("flag"))

scala> result.collect().map(x => (x.getInt(0),x.getInt(1)) -> x.getInt(2)).toMap
res4: scala.collection.immutable.Map[(Int, Int),Int] = Map((1,8) -> 0, (3,4) -> 0, (3,1) -> 0, (9,1) -> 1, (4,1) -> 0, (8,1) -> 1, (2,8) -> 0, (8,3) -> 1, (1,9) -> 0, (1,4) -> 1, (8,2) -> 1, (4,9) -> 0, (3,2) -> 0, (1,3) -> 1, (4,8) -> 0, (4,2) -> 1, (2,4) -> 1, (8,4) -> 1, (2,3) -> 1, (4,3) -> 1, (9,4) -> 1, (3,8) -> 0)

SPARK SQL 版本

df.registerTempTable("data_table")

val result = sqlContext.sql("""
SELECT DISTINCT t0.player_id, t1.player_id, CASE WHEN t0.team == t0.win THEN 1 ELSE 0 END AS flag FROM data_table t0
INNER JOIN data_table t1
ON t0.match_id = t1.match_id
AND t0.team != t1.team
""")

【讨论】:

  • 感谢@rogue-one 提供这两种解决方案。也许我的思维方式过于复杂,而您使解决方案看起来更简洁、更容易。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-05-29
  • 1970-01-01
  • 1970-01-01
  • 2020-08-13
  • 2020-08-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多