【问题标题】:Merging multiple RDDs合并多个 RDD
【发布时间】:2018-07-31 09:26:13
【问题描述】:

我正在开发一个 Apache-Spark 项目。我有一个亚马逊产品评论数据集。每个元素都有诸如 userId、productId、score、乐于助人之类的字段——我认为与我的问题并不真正相关。

首先我必须创建一个 RDD,其中包含相对于特定 productId 的元组;尤其是最终的帮助不仅仅是用户在该评论中获得的帮助,还包括其他用户的平均帮助。

然后我想计算所有产品对每个用户的平均最终有用性。计算相对于单个产品的结果的函数是pageRankOneMovie。我虽然解决方案是在 productId 的集合上使用 flatMap,就像这样

 val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)

但是我遇到了错误 SPARK-5063,因为通过在 flatMap 中调用 pageRankOneMovie 我正在嵌套转换。

我研究了一些关于广播变量和累加器的知识,我认为我可以构建一些有用的东西;但是我想知道是否有针对我的问题的特定解决方案,因为它对我来说看起来非常简单:我需要以编程方式创建一系列 RDD,然后将它们合并在一起。

作为参考,这是我正在尝试运行的程序(编译正常,出现 5063 运行时错误):

object PageRank {

def pageRankOneMovie(movies : RDD[Movie], productId : String) : RDD[(String, Double)] = {
    val helpfulness = userHelpfulness(movies)
                .filter { case (_,value) => !value.isEmpty }
                .mapValues { _.get}

    val average = helpfulnessByScore(movies, productId)

    val reviews = movies.filter(_.productId == productId).map( mov => (mov.userId, mov.score))
    val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }

    reviewHelpfulness.join(average).map {
        case (score, ((id, help), averageHelpfulness)) =>
            (id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
    }
}

def compute(movies: RDD[Movie], context: SparkContext) : RDD[(String, Double)] = {
    val moviesProductId = movies.map(_.productId).distinct

    val userHelpfulnessRankings = moviesProductId.flatMap(pageRankOneMovie(movies, _).collect.toList)

    val average = userHelpfulnessRankings
                                .aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1),
                                                            (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

        average.map { case (userId, acc) => (userId, acc._1/acc._2) }
    }
}

我使用的数据集来自https://snap.stanford.edu/data/web-Movies.html

【问题讨论】:

    标签: scala apache-spark dataset rdd


    【解决方案1】:

    好的,看来这个问题没有通用的解决方案。显然只有两种方法可以解决这种情况:

    1. collect 要么导致 for 循环,然后从那里继续工作,要么
    2. 在单个转换序列中一起计算所有结果。

    由于第一个解决方案需要从工人到司机收集大量数据,因此我选择了第二个想法。

    基本上,我没有从一开始就隔离单个 productId,而是使用 (score, productId) 元组作为键,在进行过程中跟踪多部电影。最终函数如下。

     def pageRankAllMovies(movies : RDD[Movie]) = {
        // Helpfulness media degli utenti
        // (userId, helpfulness (tra 0 e 1))
        val helpfulness = userHelpfulness(movies)
                    .filter { case (_,value) => !value.isEmpty }
                    .mapValues { _.get}
    
        // Helpfulness media delle review per film in base allo score assegnato
        // ((score, productId), helpfulness) per un singolo productId
        val average = helpfulnessByScore(movies)
    
        val reviews = movies.map( mov => (mov.userId, (mov.score, mov.productId)))
        val reviewHelpfulness = reviews.join(helpfulness).map { case (id, (score, help)) => (score, (id, help)) }
    
        // Per ogni "gruppo" di review di uno stesso film che assegnano lo stesso score tiro su
        // la helpfulness degli utenti in base alla media del film
        val globalUserHelpfulness = reviewHelpfulness.join(average).map {
            case (score, ((id, help), averageHelpfulness)) =>
                (id, if (help < averageHelpfulness) (help+averageHelpfulness)/2 else help)
        }
    
        // Se consideriamo piu' di un film alla fine ci sono piu' valori di helpfulness
        // per ogni utente. Si fa la media
        globalUserHelpfulness.aggregateByKey((0.0,0)) ((acc, value) => (acc._1+value, acc._2+1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2+ acc2._2))
            .map { case (userId, help) => (userId, help._1/help._2) }
    }
    

    tl;dr: 要么collect 循环中的所有结果,要么在一个转换序列中管理所有计算。

    【讨论】:

      猜你喜欢
      • 2016-06-30
      • 1970-01-01
      • 2023-03-18
      • 1970-01-01
      • 2018-12-31
      • 1970-01-01
      • 1970-01-01
      • 2015-10-18
      • 1970-01-01
      相关资源
      最近更新 更多