【问题标题】:How to do this transformation in SQL/Spark/GraphFrames如何在 SQL/Spark/GraphFrames 中进行这种转换
【发布时间】:2020-03-20 21:30:22
【问题描述】:

我有一个包含以下两列的表格:

Device-Id    Account-Id
d1           a1   
d2           a1
d1           a2
d2           a3
d3           a4
d3           a5 
d4           a6
d1           a4

Device-Id 是安装我的应用的设备的唯一 ID,Account-Id 是用户帐户的 ID。一个用户可以拥有多个设备,并且可以在同一设备上创建多个帐户(例如,d1 设备设置了 a1、a2 和 a3 帐户)。

我想找到唯一的实际用户(应该在生成的表中表示为具有一些唯一 UUID 的新列)并且我正在寻找的转换生成下表:

Unique-User-Id    Devices-Used    Accounts-Used
uuid1             [d1, d2, d3]    [a1, a2, a3, a4, a5]   
uuid2             [d4]            [a6]

上面生成的表格背后的想法是,实际用户 uuid1 在他们的设备 d1 和 d2 上设置了一个帐户 a1,这实质上意味着这两个设备都属于 uuid 1 并且所有其他帐户都设置在这些设备上d1 和 d2 设备也映射到同一个用户 uuid1。同样,d1 也有一个账户 a4,它也在 d3 上设置,所以 d3 也是 uuid1 的设备,它上面的每个账户都应该映射到 uuid1。

如何在 SQL/Spark/GraphFrames(由 DataBricks 提供)中实现上述转换,其中 Device-Ids 和 Account-Ids 都可以是数百万

【问题讨论】:

  • 查找哪个设备和哪个账号属于哪个用户的逻辑是什么?
  • 你的 spark 版本是什么,2.4+ 或更低?每个设备只能有一个所有者(user_id),这是逻辑吗?
  • @pissall 上下文是一个帐户(例如 a1)将只属于一个用户(但反过来不正确,一个用户可能有多个帐户),现在因为可以配置同一个帐户在多个设备上,所有这些设备以及在这些设备上设置的所有帐户都将映射到单个用户(注意:用户!= 帐户)。组合两个设备背后的逻辑是它们应该共享一个在其上设置的公共帐户。
  • @jxc 2.4+;不,实际用户将拥有许多设备(Android、iOS、平板电脑等),并且会在其上注册许多帐户。然而,一个帐户只会指向一个用户,我需要找到唯一的用户。示例:Ben 有两个设备 d1 和 d2,并在这两个设备上分别设置了 a1、a2 和 a2、a3。现在,a1 将对应一个唯一用户,我们称之为 uuid1;我需要一些转换来确定由于 a1 在 d1 上,所以 d1 上的所有帐户也映射到 uuid1,并且由于 a2,作为 uuid1 的帐户也在 d2 上,d2 上的每个帐户也应该映射到 uuid1。
  • @AmanGill,所以我实际上是对的。正如您提到的all accounts on d1 also maps to uuid1,同一设备上的帐户应属于同一用户(或 uuid),这就是您将 d2 链接到 d1 的方式。

标签: sql apache-spark pyspark spark-graphx graphframes


【解决方案1】:

我对这个解决方案并不感到自豪,因为我认为可能有一个更有效的解决方案,但无论如何我都会把它留在这里。希望对你有帮助

import org.apache.spark.sql.functions._

val flatten_distinct = (array_distinct _) compose (flatten _)

val df = Seq(
  ("d1","a1"),  
  ("d2","a1"),
  ("d1","a2"),
  ("d2","a3"),
  ("d3","a4"),
  ("d3","a5"),
  ("d4","a6")
).toDF("d_id","u_id")


val userDevices = df
  .groupBy("u_id")
  .agg(collect_list("d_id").alias("d_id_list"))

//+----+---------+
//|u_id|d_id_list|
//+----+---------+
//|  a5|     [d3]|
//|  a3|     [d2]|
//|  a4|     [d3]|
//|  a2|     [d1]|
//|  a1| [d1, d2]|
//|  a6|     [d4]|
//+----+---------+


val accountsByDevice = df
  .groupBy("d_id")
  .agg(collect_list("u_id").alias("u_id_list"))

//+----+---------+
//|d_id|u_id_list|
//+----+---------+
//|  d2| [a3, a1]|
//|  d3| [a4, a5]|
//|  d1| [a1, a2]|
//|  d4|     [a6]|
//+----+---------+


val ungroupedDf = userDevices
  .join(accountsByDevice, expr("array_contains(d_id_list,d_id)"))
  .groupBy("d_id_list")
  .agg(collect_set("u_id_list") as "set")
  .select(col("d_id_list") as "d_id", flatten_distinct(col("set")) as "u_id")
  .select(explode(col("d_id")) as "d_id", col("u_id"), size(col("u_id")) as "size")

//+----+------------+----+
//|d_id|        u_id|size|
//+----+------------+----+
//|  d2|    [a1, a3]|   2|
//|  d1|[a1, a3, a2]|   3|
//|  d2|[a1, a3, a2]|   3|
//|  d3|    [a4, a5]|   2|
//|  d1|    [a1, a2]|   2|
//|  d4|        [a6]|   1|
//+----+------------+----+


val finalDf = ungroupedDf
  .join(ungroupedDf.groupBy("d_id").agg(max(col("size")) as "size"), Seq("size","d_id"))
  .groupBy("u_id")
  .agg(collect_set("d_id") as "d_id")
  .withColumn("unique_id", monotonically_increasing_id())

//+------------+--------+-------------+
//|        u_id|    d_id|    unique_id|
//+------------+--------+-------------+
//|[a1, a2, a3]|[d1, d2]|1228360646656|
//|    [a4, a5]|    [d3]|1297080123392|
//|        [a6]|    [d4]|1520418422784|
//+------------+--------+-------------+

【讨论】:

  • 我也想到了一些非常相似的东西,我也不为此感到自豪。我会再等几天看看是否有其他问题出现,如果没有,我会将其标记为解决方案。
【解决方案2】:

你可以试试GraphFrame.connectedComponents,给所有Device-IDs加上前缀,这样就可以在后处理步骤中从Account-IDs中拆分出来:

from graphframes import GraphFrame
from pyspark.sql.functions import collect_set, expr

df = spark.createDataFrame([
         ("d1","a1"), ("d2","a1"), ("d1","a2"), ("d1","a4"),
         ("d2","a3"), ("d3","a4"), ("d3","a5"), ("d4","a6")  
], ["Device-Id","Account-Id"])

# set checkpoint which is required for Graphframe
spark.sparkContext.setCheckpointDir("/tmp/111")

# for testing purpose, set a small shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 2)

# set up edges and vertices, add an underscore as prefix of Device-ID
edges = df.withColumn('Device-Id', expr('concat("_", `Device-Id`)')).toDF('src', 'dst')
vertices = edges.selectExpr('src as id').distinct().union(edges.select('dst').distinct())

# set up the graph
g = GraphFrame(vertices, edges)

# compute the connected components and group resultset by component
# and collect corresponding ids using collect_set(id)
df1 = g.connectedComponents().groupby('component').agg(collect_set('id').alias('ids'))
df1.show(truncate=False)
+------------+-----------------------------------+
|component   |ids                                |
+------------+-----------------------------------+
|309237645312|[a6, _d4]                          |
|85899345920 |[_d1, a4, a1, _d3, a3, a5, a2, _d2]|
+------------+-----------------------------------+

# split the ids based on the prefix we predefined when creating edges.
df1.selectExpr(
      'transform(filter(ids, x -> left(x,1) = "_"), y -> substr(y,2)) AS `Devices-Used`'
    , 'filter(ids, x -> left(x,1) != "_") AS `Accounts-Used`'
    , 'component AS `Unique-User-Id`'
).show()
+------------+--------------------+--------------+
|Devices-Used|       Accounts-Used|Unique-User-Id|
+------------+--------------------+--------------+
|[d1, d3, d2]|[a4, a1, a3, a5, a2]|   85899345920|
|        [d4]|                [a6]|  309237645312|
+------------+--------------------+--------------+

编辑: 上述方法在创建不必要的大边/顶点列表时效率较低,使用自连接创建边列表应该是更好的选择(受此 post 启发):

edges = df.alias('d1').join(df.alias('d2'), ["Account-Id"]) \
    .filter("d1.`Device-Id` > d2.`Device-Id`") \
    .toDF("account", "src", "dst")
+-------+---+---+
|account|src|dst|
+-------+---+---+
|     a1| d2| d1|
|     a4| d3| d1|
+-------+---+---+

vertices = df.selectExpr('`Device-Id` as id', "`Account-Id` as acct_id")
g = GraphFrame(vertices, edges)

df1 = g.connectedComponents() \
    .groupby('component') \
    .agg(
       collect_set('id').alias('Device-Ids'),
       collect_set('acct_id').alias('Account-Ids')
     )
+---------+------------+--------------------+
|component|  Device-Ids|         Account-Ids|
+---------+------------+--------------------+
|        0|[d1, d2, d3]|[a4, a1, a3, a5, a2]|
|        1|        [d4]|                [a6]|
+---------+------------+--------------------+

【讨论】:

  • 这看起来很有趣,但是在您附加的最后一个表中(包括 (d1, a4)),前两行应该合并为 1 即。 ((a1, a2, a3, a4, a5) (d1, d2, d3))
  • @AmanGill 这就是我认为您的示例过于简单的原因,我建议您在现有示例中添加一个更一般的案例("d1", "a4") 作为旁注,这样人们就不会感到困惑。
  • @AmanGill,作为旁注,两个答案都有相同的问题。当人们对真正的问题没有全面了解时,如果你坚持简单的例子,我怀疑你是否能得到任何有用的答案。
  • 编辑了问题。感谢您的建议。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-02-23
  • 2016-09-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-20
相关资源
最近更新 更多