【问题标题】:pyspark: what is the best way to select n distinct IDs from a datasetpyspark:从数据集中选择 n 个不同 ID 的最佳方法是什么
【发布时间】:2020-07-18 12:52:52
【问题描述】:

pyspark中有一个DataFrame,数据如下:

    id manager score
    A     x     3
    A     y     1
    B     a     2
    B     b     5
    C     f     2
    D     f     6

我期望结果数据集中恰好有 n 个 ID。

例如。如果我说需要 3 个 ID,那么生成的数据集将如下所示:

id manager score
 A   x      3
 A   y      1
 B   a      2
 B   b      5
 C   f      2

所以这个数据集有 5 行,但正如预期的那样正好有 3 个 ID。 如果我执行 df.limit(3) 它将仅是前 3 条记录,因为 ID 重复,我将获得少于 3 个 ID,即在这种情况下只有 2 个。如何进行?谢谢

【问题讨论】:

    标签: python apache-spark pyspark pyspark-dataframes


    【解决方案1】:

    您可以使用 spark sql 查询来执行此操作。

    只需更改子查询中的limit 子句值以选择不同id 的数量。

    df=spark.createDataFrame([("A", "x", "3"), ("A", "y", "1"), ("B", "a", "2"), ("B", "b", "5"), ("C", "v", "2"), ("D", "f", "6")], ["id", "manager", "score"])
    
    +---+-------+-----+
    | id|manager|score|
    +---+-------+-----+
    |  A|      x|    3|
    |  A|      y|    1|
    |  B|      a|    2|
    |  B|      b|    5|
    |  C|      v|    2|
    |  D|      f|    6|
    +---+-------+-----+
    
    df.createOrReplaceTempView("employee")
    
    sql("""select * from employee e1 
         inner join (  
            select distinct id as uni_id from employee order by uni_id limit 3) e2 
         on e1.id=e2.uni_id""").drop("uni_id").show() 
    
    +---+-------+-----+
    | id|manager|score|
    +---+-------+-----+
    |  A|      x|    3|
    |  A|      y|    1|
    |  B|      a|    2|
    |  B|      b|    5|
    |  C|      v|    2|
    +---+-------+-----+
    

    【讨论】:

    • 至于结果输出,这个答案很完美,谢谢!但是,我将不得不使用 pyspark 代码。该逻辑帮助我在 pyspark 中对其进行编码。谢谢。
    • @Daenerys,如果这个答案对您有帮助,请随时投票!
    • @suresiva,您可以使用where id in (select distinct id ...limit 3) 避免加入。请在下面查看我的答案。
    【解决方案2】:

    您也可以尝试StringIndexer,它将id列中的每个字符串分解,然后根据限制分解filter

    import pyspark.sql.functions as F
    from pyspark.ml.feature import StringIndexer
    
    n = 3 #change as per limit
    idx = StringIndexer(inputCol="id",outputCol="id_num")
    idx.fit(df).transform(df).filter(F.col("id_num")<n).drop("id_num").show()
    

    +---+-------+-----+
    | id|manager|score|
    +---+-------+-----+
    |  A|      x|    3|
    |  A|      y|    1|
    |  B|      a|    2|
    |  B|      b|    5|
    |  C|      f|    2|
    +---+-------+-----+
    

    【讨论】:

    • 我的 n 是 1000。我试过这个,最后当我检查结果时,df.select("id").distinct().count() 我应该得到的结果是 1000,但得到了 54k。不知道为什么逻辑对我不起作用。另外,我觉得对于这个要求来说这是一个昂贵的操作。
    • 奇怪,你保存到新的 df 了吗? df1 = idx.fit(df).transform(df).filter(F.col("id_num")&lt;n).drop("id_num") 然后做 df1.select("id").distinct().count() ? @丹妮莉丝
    • 是的,你完全正确,它对我有用!早些时候,我检查了一个缓存的结果,所以给了我一个不同的计数,我的错。谢谢!!但是,现在,我更喜欢在不同的 id 列表中使用内部连接。非常感谢,在这里学到了一些新东西。
    【解决方案3】:

    这是使用 'colllect_set' 函数和一些 Python 操作的简单方法:

    idLimit=3 #define your limit
    
    id_lst=(sourceDF  #collect a list of distinct ids
            .select(collect_set('id'))
            .collect()[0][0]
           )
    
    id_lst.sort() #sort the ids alphabatically
    
    id_lst_limited=id_lst[:idLimit] #limit the list as per your defined limit
    
    targetDF=(sourceDF #filter the source df using your limited list
              .filter("id in ({0})".format(str(id_lst_limited)[1:-1]))
             )
    

    【讨论】:

    • 随时搭档!如果对您有帮助,请随时支持我的回答!
    【解决方案4】:

    您可以通过 where id in (select distinct id ...limit 3) 避免 join,如下所示-

     val df = Seq(("A", "x", "3"), ("A", "y", "1"), ("B", "a", "2"), ("B", "b", "5"), ("C", "v", "2"), ("D", "f", "6"))
        .toDF("id", "manager", "score")
        df.show(false)
    
        /**
          * +---+-------+-----+
          * |id |manager|score|
          * +---+-------+-----+
          * |A  |x      |3    |
          * |A  |y      |1    |
          * |B  |a      |2    |
          * |B  |b      |5    |
          * |C  |v      |2    |
          * |D  |f      |6    |
          * +---+-------+-----+
          */
    
        df.createOrReplaceTempView("employee")
        spark.sql("select * from employee where id in (select distinct id from employee order by id limit 3)")
          .show(false)
    
        /**
          * +---+-------+-----+
          * |id |manager|score|
          * +---+-------+-----+
          * |A  |x      |3    |
          * |A  |y      |1    |
          * |B  |a      |2    |
          * |B  |b      |5    |
          * |C  |v      |2    |
          * +---+-------+-----+
          */
    

    【讨论】:

    • @Daenerys 看看这个。
    【解决方案5】:

    我注意到上面的答案之一是基于 Spark SQL。

    这是另一种基于 Spark SQL 的方法,但带有 WINDOW 子句 -

    sql("select id, manager, score from (select e1.id, e1.manager, e1.score, dense_rank() over (order by e1.id) as rrank from employee e1) where rrank <= 3").show()
    

    完整代码-

    df=spark.createDataFrame([("A", "x", "3"), ("A", "y", "1"), ("B", "a", "2"), ("B", "b", "5"), ("C", "v", "2"), ("D", "f", "6")], ["id", "manager", "score"])
    
    +---+-------+-----+
    | id|manager|score|
    +---+-------+-----+
    |  A|      x|    3|
    |  A|      y|    1|
    |  B|      a|    2|
    |  B|      b|    5|
    |  C|      v|    2|
    |  D|      f|    6|
    +---+-------+-----+
    
    df.createOrReplaceTempView("employee")
    
    
    sql("select id, manager, score from (select e1.id, e1.manager, e1.score, dense_rank() over (order by e1.id) as rrank from employee e1) where rrank <= 3").show()
    
    +---+-------+-----+
    | id|manager|score|
    +---+-------+-----+
    |  A|      x|    3|
    |  A|      y|    1|
    |  B|      a|    2|
    |  B|      b|    5|
    |  C|      v|    2|
    +---+-------+-----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-09-02
      • 1970-01-01
      • 2011-01-23
      • 2022-08-13
      • 2020-12-05
      • 1970-01-01
      • 2010-09-26
      相关资源
      最近更新 更多