【问题标题】:RDD sort after grouping and summing分组求和后的RDD排序
【发布时间】:2016-05-04 17:35:11
【问题描述】:

我正在尝试对一些 yelp 数据进行一些分析。数据结构如下:

>>> yelp_df.printSchema()
root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: string (nullable = true)
 |-- open: boolean (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- state: string (nullable = true)

我想统计每个州的记录,包括 10 条或更多评论的整体,这些评论当前处于打开状态,并找到计数第三高的州。首先我做了

>>> revDF = yelp_df.filter(yelp_df.review_count > 9)
>>> openDF = revDF.filter(revDF.open == True)
>>> openDF.groupBy("state").agg({"review_count":"sum"}).collect()

这给了这个

[Row(state=u'MN', SUM(review_count#16)=3470), Row(state=u'GA', SUM(review_count#16)=5764), Row(state=u'TX', SUM(review_count#16)=1778), Row(state=u'AZ', SUM(review_count#16)=72214), Row(state=u'NY', SUM(review_count#16)=4081), Row(state=u'OR', SUM(review_count#16)=2125), Row(state=u'ID', SUM(review_count#16)=429), Row(state=u'CA', SUM(review_count#16)=1876), Row(state=u'CO', SUM(review_count#16)=6720), Row(state=u'WA', SUM(review_count#16)=525), Row(state=u'LA', SUM(review_count#16)=8394)]

现在将其存储到 summedDF 后,

summedDF.sort(summedDF.state.desc()).collect()

按状态排序就好了,但是(不出所料)

summedDF.sort(summedDF.SUM(review_count#16).desc()).collect()

不起作用。 实际上,它甚至没有运行。我有正确数量的括号,但它没有执行,而是转到 ... 之前的下一行,等待新的输入。

我该如何进行这种排序,不执行是怎么回事? #16 怎么了?

【问题讨论】:

    标签: apache-spark pyspark pyspark-sql


    【解决方案1】:

    编辑:为 pyspark 添加了版本。

    我建议您将代码重构为:

    val finalDF = yelp_df
      .where(col("review_count") > 9 && col("open") === true)
      .groupBy("state")
      .agg(sum("review_count").as("sum_column"))
      .sort(col("sum_column").desc)
    

    也许我们可以适应 pyspark:

    from pyspark.sql.functions import *
    finalDF = yelp_df \
        .where((col("review_count") > 9) & (col("open") == True)) \
        .groupBy("state") \
        .agg(col("state"), sum(col("review_count")).alias("sum_column")) \
        .sort(col("sum_column").desc())
    

    现在回答你的问题:

    不执行是怎么回事? #16 是怎么回事?

    简而言之,您尝试使用 summedDF.SUM(review_count#16) 引用该列没有成功。

    sort 函数使用Column 对象(可以通过调用col("name") 创建),或者直接使用列的名称。但是,当您进行聚合时,您没有为表示总和的新列选择名称,因此以后引用它有点困难。为了解决这个问题,我在第四行使用了.as("sum_column")

    【讨论】:

    • 抱歉,这是 pyspark,而不仅仅是 spark。因此,>>> 因此,“val”以及此处的其他命令无法识别
    • 对不起,我没有注意到这个细节。但是,逻辑应该是相同的。我相信 python 也可以使用相同的功能。
    • @hedgedandlevered 我尝试为 pyspark 插入一个改编版本,请让我知道它是否有效。抱歉,我不太习惯 pyspark。
    • 我正在像这样pyspark --packages com.databricks:spark-csv_2.11:1.4.0 那样从命令行启动我的会话。那是pyspark,对吧?我没有让你的代码执行。它再次执行...,就像它不完整一样。我正在尝试finalDF = yelp_df.where("review_count > 9 AND open = true").groupBy("state").agg(F.sum(col("review_count").alias("sum_column")).sort(col("sum_column").desc)
    • 您是否在通话之间插入换行符?如果是这样,您必须以 \ 结束该行
    猜你喜欢
    • 1970-01-01
    • 2016-06-03
    • 1970-01-01
    • 2020-02-21
    • 1970-01-01
    • 2019-03-18
    • 1970-01-01
    • 2015-12-26
    • 1970-01-01
    相关资源
    最近更新 更多