【问题标题】:value toDF is not a member of org.apache.spark.rdd.RDD[(K, V)]值 toDF 不是 org.apache.spark.rdd.RDD[(K, V)] 的成员
【发布时间】:2020-01-20 06:08:31
【问题描述】:

注意:以下问题的答案中给出的建议无效 value toDF is not a member of org.apache.spark.rdd.RDD value toDF is not a member of org.apache.spark.rdd.RDD[Weather]

我正在尝试编写一个通用函数,它只保留给定数据集中每个键的前 k 个值:

下面是代码:

def topKReduceByKey[K:ClassTag,V:Ordering](ds: Dataset[(K, V)], k: Int): Dataset[(K, V)] = {
    import sqlContext.implicits._
    ds
      .rdd
      .map(tuple => (tuple._1, Seq(tuple._2)))
      .reduceByKey((x, y) => (x ++ y).sorted(Ordering[V].reverse).take(k))
      .flatMap(tuple => tuple._2.map(v => (tuple._1, v)))
      .toDF("key", "value")
      .as[(K, V)]
  }

在运行时,我收到以下错误消息:

Error:(43, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(K, V)]
possible cause: maybe a semicolon is missing before `value toDF'?
      .toDF("key", "value")

谁能帮我理解这里出了什么问题?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    有多种方法可以做到这一点(分组、分区、迭代分区),但只有在您喜欢自定义分区时才应该使用 RDD,对于其他任何情况,您都应该使用 Dataframe 或 Datasets。

    我将提供一个 Python 版本,使用 Dataframes。对于带有数据集的 Scala 来说,这应该是一个很好的开始示例(API 相同)。

    def topKByColumn(df, group_column, ordering_column, k):
        window = Window.partitionBy(df[group_column]).orderBy(df[ordering_column].desc())
        top_k = df.withColumn('rank', row_number().over(window))
        top_k = top_per[top_k.rank <= k]
        return top_k
    

    【讨论】:

    • 我知道这种方法,但在我的情况下,按键分区不是一个好主意,因为某些键可能更偏斜。我发现按键减少是更优化的方法性能明智,如果有一些我会很高兴可以在转换为 df 的最后一步提出错误的解决方案。
    • 在继续使用这个解决方案之前,我只想强调一个事实,即当最终结果与被归约的类型相同时,应该使用 reduceByKey。在您的情况下,最终结果是一个列表。您应该使用 combineByKey。您也正在使用排序。这会将整个列表放入 reducers 内存中,因此您最终可能会在该 reducer 上耗尽内存(如果您的数据有偏差,那么这种情况的机会很大)。
    猜你喜欢
    • 1970-01-01
    • 2016-02-15
    • 1970-01-01
    • 1970-01-01
    • 2023-03-18
    • 1970-01-01
    • 1970-01-01
    • 2017-11-19
    相关资源
    最近更新 更多