【发布时间】:2018-07-31 09:26:13
【问题描述】:
我正在开发一个 Apache-Spark 项目。我有一个亚马逊产品评论数据集。每个元素都有诸如 userId、productId、score、乐于助人之类的字段——我认为与我的问题并不真正相关。
首先我必须创建一个 RDD,其中包含相对于特定 productId 的元组;尤其是最终的帮助不仅仅是用户在该评论中获得的帮助,还包括其他用户的平均帮助。
然后我想计算所有产品对每个用户的平均最终有用性。计算相对于单个产品的结果的函数是pageRankOneMovie。我虽然解决方案是在 productId 的集合上使用 flatMap,就像这样
val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)
但是我遇到了错误 SPARK-5063,因为通过在 flatMap 中调用 pageRankOneMovie 我正在嵌套转换。
我研究了一些关于广播变量和累加器的知识,我认为我可以构建一些有用的东西;但是我想知道是否有针对我的问题的特定解决方案,因为它对我来说看起来非常简单:我需要以编程方式创建一系列 RDD,然后将它们合并在一起。
作为参考,这是我正在尝试运行的程序(编译正常,出现 5063 运行时错误):
object PageRank {
def pageRankOneMovie(movies : RDD[Movie], productId : String) : RDD[(String, Double)] = {
val helpfulness = userHelpfulness(movies)
.filter { case (_,value) => !value.isEmpty }
.mapValues { _.get}
val average = helpfulnessByScore(movies, productId)
val reviews = movies.filter(_.productId == productId).map( mov => (mov.userId, mov.score))
val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }
reviewHelpfulness.join(average).map {
case (score, ((id, help), averageHelpfulness)) =>
(id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
}
}
def compute(movies: RDD[Movie], context: SparkContext) : RDD[(String, Double)] = {
val moviesProductId = movies.map(_.productId).distinct
val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)
val average = userHelpfulnessRankings
.aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
average.map { case (userId, acc) => (userId, acc._1/acc._2) }
}
}
【问题讨论】:
标签: scala apache-spark dataset rdd