【问题标题】:Distinct Word count per line每行不同的字数
【发布时间】:2020-01-13 03:12:46
【问题描述】:

这与常见的字数统计程序略有不同。我正在尝试获取每行不同的字数。

输入

Line number one has six words
Line number two has two words

预期输出

line1 => (Line,1),(number,1),(one,1),(has,1),(six,1),(words,1)
line2 => (Line,1),(number,1),(two,2),(has,1),(words,1)

谁能指导我。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    通过在内置函数中使用 Dataframe explode,split,collect_set,groupBy

    //input data
    val df=Seq("Line number one has six words","Line number two has has two words").toDF("input")
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    df.withColumn("words",explode(split($"input","\\s+"))) //split by space and explode
    .groupBy("input","words") //group by on both columns
    .count()
    .withColumn("line_word_count",struct($"words",$"count")) //create struct
    .groupBy("input") //grouping by input data column
    .agg(collect_set("line_word_count").alias("line_word_count"))
    .show(false)
    

    Result:

    +---------------------------------+------------------------------------------------------------------+
    |input                            |line_word_count                                                   |
    +---------------------------------+------------------------------------------------------------------+
    |Line number one has six words    |[[one, 1], [has, 1], [six, 1], [number, 1], [words, 1], [Line, 1]]|
    |Line number two has has two words|[[has, 2], [two, 2], [words, 1], [number, 1], [Line, 1]]          |
    +---------------------------------+------------------------------------------------------------------+
    

    如果您需要行号,请使用concat,monotonically_increasing_id 函数:

    df.withColumn("line",concat(lit("line"),monotonically_increasing_id()+1))
    .withColumn("words",explode(split($"input","\\s+"))) 
    .groupBy("input","words","line") 
    .count() 
    .withColumn("line_word_count",struct($"words",$"count")) 
    .groupBy("line") 
    .agg(collect_set("line_word_count").alias("line_word_count")) 
    .show(false)
    

    Result:

    +-----+------------------------------------------------------------------+
    |line |line_word_count                                                   |
    +-----+------------------------------------------------------------------+
    |line1|[[one, 1], [has, 1], [six, 1], [words, 1], [number, 1], [Line, 1]]|
    |line2|[[has, 2], [two, 2], [number, 1], [words, 1], [Line, 1]]          |
    +-----+------------------------------------------------------------------+
    

    注意如果数据集较大,我们需要这样做.repartition(1)

    【讨论】:

      【解决方案2】:

      这是使用 RDD API 的另一种方式:

      val rdd = df.withColumn("output", split($"input", " ")).rdd.map(l => (
                      l.getAs[String](0), 
                      l.getAs[Seq[String]](1).groupBy(identity).mapValues(_.size).map(identity))
                )
      
      val dfCount = spark.createDataFrame(rdd).toDF("input", "output")
      

      不太喜欢使用 UDF,但也可以这样:

      import org.apache.spark.sql.functions.udf
      
      val mapCount: Seq[String] => Map[String, Integer] = _.groupBy(identity).mapValues(_.size)
      val countWordsUdf  = udf(mapCount)
      
      df.withColumn("output", countWordsUdf(split($"input", " "))).show(false)
      

      给予:

      +---------------------------------+------------------------------------------------------------------+
      |input                            |output                                                            |
      +---------------------------------+------------------------------------------------------------------+
      |Line number one has six words    |[number -> 1, Line -> 1, has -> 1, six -> 1, words -> 1, one -> 1]|
      |Line number two has has two words|[number -> 1, two -> 2, Line -> 1, has -> 2, words -> 1]          |
      +---------------------------------+------------------------------------------------------------------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-07-25
        • 2020-04-30
        • 2023-02-05
        • 2011-04-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-05-08
        相关资源
        最近更新 更多