【问题标题】:Finding all pairs of users that have a certain amount of common values查找具有一定数量共同值的所有用户对
【发布时间】:2019-04-01 22:44:22
【问题描述】:

我是 spark 新手,我正在尝试查找有关已转换为两个单独 DataFrame 的几个数据列表的具体信息。

这两个DataFrame是:

Users:                item_Details:
user_id | item_id     item_id | item_name
-----------------     ----------------------
  1     | 123           123   |  phone
  2     | 223           223   |  game
  3     | 423           423   |  foo
  2     | 1223          1223  |  bar
  1     | 3213          3213  | foobar

我需要找到所有拥有超过 50 个共同项目的用户对,并按项目数排序。不能有重复,这意味着应该只有一组 userId 1 和 userId 2。

结果需要如下所示:

user_id1 | user_id2 | count_of_items | list_of_items
-------------------------------------------------------------
    1    |     2    |       51       |  phone,foo,bar,foobar

【问题讨论】:

    标签: scala dataframe hive apache-spark-sql hiveql


    【解决方案1】:

    这是一种方法:

    1. 通过自加入为每个不同的用户对组装 item pairs
    2. 使用 UDF 从 item pairs 生成 common items
    3. 按特定的常见项目计数过滤结果数据集

    如下图:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Row
    
    val users = Seq(
      (1, 123), (1, 223), (1, 423),
      (2, 123), (2, 423), (2, 1223), (2, 3213),
      (3, 223), (3, 423), (3, 1223), (3, 3213),
      (4, 123), (4, 1223), (4, 3213)
    ).toDF("user_id", "item_id")
    
    val item_details = Seq(
      (123, "phone"), (223, "game"), (423, "foo"), (1223, "bar"), (3213, "foobar")
    )toDF("item_id", "item_name")
    
    val commonItems = udf( (itemPairs: Seq[Row]) =>
      itemPairs.collect{ case Row(a: Int, b: Int) if a == b => a }
    )
    
    val commonLimit = 2  // Replace this with any specific common item count
    
    val user_common_items =
      users.as("u1").join(users.as("u2"), $"u1.user_id" < $"u2.user_id").
      groupBy($"u1.user_id", $"u2.user_id").agg(
        collect_set(
          struct($"u1.item_id".as("ui1"), $"u2.item_id".as("ui2"))
        ).as("item_pairs")).
      withColumn("common_items", commonItems($"item_pairs")).
      drop("item_pairs").
      where(size($"common_items") > commonLimit)
    
    user_common_items.show(false)
    // +-------+-------+-----------------+
    // |user_id|user_id|common_items     |
    // +-------+-------+-----------------+
    // |2      |3      |[423, 3213, 1223]|
    // |2      |4      |[3213, 123, 1223]|
    // +-------+-------+-----------------+
    

    如果需要通用项目名称而不是项目ID,您可以在上述步骤中加入item_details以聚合项目名称;或者,您可以分解现有的 common item ids 并通过用户对加入 item_details 以及 collect_list 聚合:

    user_common_items.
      withColumn("item_id", explode($"common_items")).
      join(item_details, Seq("item_id")).
      groupBy($"u1.user_id", $"u2.user_id").agg(collect_list($"item_name").as("common_items")).
      withColumn("item_count", size($"common_items")).
      show
    // +-------+-------+--------------------+----------+
    // |user_id|user_id|        common_items|item_count|
    // +-------+-------+--------------------+----------+
    // |      2|      3|  [foo, foobar, bar]|         3|
    // |      2|      4|[foobar, phone, bar]|         3|
    // +-------+-------+--------------------+----------+
    

    【讨论】:

    • 效果很好。一个问题。我将如何添加一个列,其中包含两个用户拥有的 common_items 总数以及其他三部分信息?
    • @AntarianCoder,您可以为生成的 common-items 列表的size 添加一列,如扩展答案所示。
    • 我还有一个问题,那就是为什么使用左外连接而不是内连接?
    • 它旨在涵盖users中的项目ID不在item_details中的情况,但由于collect_list会自动删除空值,因此使用left几乎没有什么好处加入。我已将其删除。
    【解决方案2】:

    另一种解决方案,不使用 UDF。由于我们需要公共项目,因此可以在 joinExprs 本身中给出匹配。看看这个

    val users = Seq(
      (1, 123), (1, 223), (1, 423),
      (2, 123), (2, 423), (2, 1223), (2, 3213),
      (3, 223), (3, 423), (3, 1223), (3, 3213),
      (4, 123), (4, 1223), (4, 3213)
    ).toDF("user_id", "item_id")
    
    val items = Seq(
      (123, "phone"), (223, "game"), (423, "foo"), (1223, "bar"), (3213, "foobar")
    )toDF("item_id", "item_name")
    
    val common_items =
      users.as("t1").join(users.as("t2"),$"t1.user_id" < $"t2.user_id" and $"t1.item_id" === $"t2.item_id" )
          .join(items.as("it"),$"t1.item_id"===$"it.item_id","inner")
          .groupBy($"t1.user_id",$"t2.user_id")
          .agg(collect_set('item_name).as("items"))
          .filter(size('items)>2) // change here for count
          .withColumn("size",size('items))
    
    common_items.show(false)
    

    结果

    +-------+-------+--------------------+----+
    |user_id|user_id|items               |size|
    +-------+-------+--------------------+----+
    |2      |3      |[bar, foo, foobar]  |3   |
    |2      |4      |[bar, foobar, phone]|3   |
    +-------+-------+--------------------+----+
    

    【讨论】:

    • 谢谢!这与我开始思考的思路一致,但我被 agg 子句卡住了,我无法做到恰到好处。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-26
    • 1970-01-01
    • 1970-01-01
    • 2014-03-30
    • 2021-05-21
    • 1970-01-01
    相关资源
    最近更新 更多