【问题标题】:How to do pairwise word-count in Scala using DataFrame如何使用 DataFrame 在 Scala 中进行成对字数统计
【发布时间】:2019-01-21 14:54:41
【问题描述】:

我有一个这样的数据框(df):

tweets
------
rain rain go away
train on the way

我的预期输出(任何格式都可以

((rain,rain),1) ((rain,go),2) (rain,away),2) ((go,away),1)
((train,on),1) ((train,the),1) ((thain,way),1) ((on,the),1) ((on,way),1) ((the,way),1)

我将 df 转换为 RDD 以应用 map 和 reduceByKey 方法,但无法获得预期的结果。我可以轻松地将Array[((String, String), Int)] 结果转换为 df 但我需要帮助才能首先生成正确的结果。

2018 年 8 月 18 日更新

其实我最终的结果应该是这样一个DataFrame:

word1    word2    count
-----    -----    -----
rain     rain       1
rain     go         2
rain     away       2
go       away       1
train    on         1
train    the        1
train    way        1
on       the        1
on       way        1
the      way        1

你能帮忙吗?可以注意到,所有答案都适用于小数据集,但当我将其应用于大量数据时会失败。

【问题讨论】:

    标签: scala apache-spark-sql rdd word-count


    【解决方案1】:

    您可以将rdd 应用于DataFrame,将使用zipWithIndex 的拆分字符串索引到一个数组中,应用combinations(2) 来组装单词对组合,并将RDD 中的每一行按实际单词分组以进行计数生成的 Map 值的大小:

    val df = Seq(
      "rain rain go away",
      "train on the way"
    ).toDF("tweets")
    
    val rdd = df.
      rdd.map(_.getString(0)).
      map( _.split("\\s+").zipWithIndex.combinations(2).toList ).
      map( _.groupBy(a => (a(0)._1, a(1)._1)).mapValues(_.size).toList )
    
    rdd.collect
    // res1: Array[List[((String, String), Int)]] = Array(
    //   List(((rain,rain),1), ((go,away),1), ((rain,go),2), ((rain,away),2)),
    //   List(((the,way),1), ((on,the),1), ((on,way),1), ((train,way),1), ((train,the),1), ((train,on),1))
    // )
    

    请注意,单词的“索引”步骤是在生成组合之前区分相同的单词,例如,配对组合中出现的两次 ("rain", "go") 不会塌陷为一。

    生成单词对组合的另一种方法是通过for-comprehension,如 cmets 部分中所建议的那样:

    val rdd = df.
      rdd.map(_.getString(0)).
      map{ row => 
        val words = row.split("\\s+")
        val sz = words.size
        for(i <- 0 until sz; j <- i + 1 until sz) yield (words(i), words(j))
      }.
      map( _.groupBy(identity).mapValues(_.size).toList )
    

    【讨论】:

    • @Abu Shoeb,我认为我将您的问题误解为计算整个 RDD 的连续单词对。重新阅读您的预期输出后,您似乎想要每行所有单词对的组合。请看我修改后的答案。
    • zipWithIndex 的有趣想法,但基本上使 combinations 有点没用。分裂后我会诉诸诸如for(i &lt;- 0 until words.length; j &lt;- i + 1 until words.length) yield((words(i), words(j)))之类的无聊东西。
    • @Victor Moroz,我也喜欢使用for-comprehension 的方法,它可能会稍微冗长一些,但确实会为以下groupBy 生成更简单的数据集。谢谢。
    • @Abu Shoeb,要将 RDD 展平为输入 Array[T] 而不是 Array[List[T]],您可以简单地将最后一个 map 替换为 flatMap。即flatMap( _.groupBy(identity).mapValues(_.size).toList )
    • 不幸的是,我无法重现上述问题。如果map 有效,但将其切换为flatMap 无效,则可以尝试在末尾链接flatMap(identity)。如果这没有帮助,我建议您发布一个单独的问题,其中仅包含导致问题的部分代码以及可以重现它的最小示例数据集。
    【解决方案2】:

    您可以使用sorted 函数到count 相同的groups,但顺序不同,如下所示

    //df
    
    +--------------------+
    |              tweets|
    +--------------------+
    |rain rain go away go|
    |    train on the way|
    +--------------------+
    
    //Solution Approach
    
     import org.apache.spark.mllib.rdd.RDDFunctions._
    
     df.rdd.flatMap(_.getString(0).split(" ")).sliding(2).map(_.sorted).map(arr=>((arr(0),arr(1))->1)).reduceByKey(_+_).collect
    
    //Output: 
    
    res13: Array[((String, String), Int)] = Array(((rain,rain),1), ((on,train),1), ((on,the),1), ((go,rain),1), ((go,train),1), ((away,go),2), ((the,way),1))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-06-23
      • 2019-08-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多