【问题标题】:How to use dataset to groupby如何使用数据集进行分组
【发布时间】:2017-11-08 08:16:15
【问题描述】:

我有一个使用rdd的请求:

val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
  )
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)

结果是:

(纽约,名单(杰克))

(底特律,名单(迈克尔,彼得,乔治))

(洛杉矶,名单(汤姆))

(休斯顿,列表(约翰))

(芝加哥,列表(大卫,安德鲁))

如何在 spark2.0 中使用数据集?

我有办法使用自定义函数,但是感觉好复杂,有没有简单点的方法?

【问题讨论】:

    标签: apache-spark dataset apache-spark-2.0


    【解决方案1】:

    我建议你先创建一个case class as

    case class Monkey(city: String, firstName: String)
    

    这个case class 应该在主类之外定义。然后你可以使用toDS 函数并使用groupByaggregation 函数调用collect_list 如下

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    
    val test = Seq(("New York", "Jack"),
      ("Los Angeles", "Tom"),
      ("Chicago", "David"),
      ("Houston", "John"),
      ("Detroit", "Michael"),
      ("Chicago", "Andrew"),
      ("Detroit", "Peter"),
      ("Detroit", "George")
    )
    sc.parallelize(test)
      .map(row => Monkey(row._1, row._2))
      .toDS()
      .groupBy("city")
      .agg(collect_list("firstName") as "list")
      .show(false)
    

    你会得到输出为

    +-----------+------------------------+
    |city       |list                    |
    +-----------+------------------------+
    |Los Angeles|[Tom]                   |
    |Detroit    |[Michael, Peter, George]|
    |Chicago    |[David, Andrew]         |
    |Houston    |[John]                  |
    |New York   |[Jack]                  |
    +-----------+------------------------+
    

    您总是可以通过调用.rdd 函数转换回RDD

    【讨论】:

      【解决方案2】:

      要创建数据集,首先在您的类之外定义一个案例类

      case class Employee(city: String, name: String)
      

      然后你可以将列表转换为数据集为

        val spark =
          SparkSession.builder().master("local").appName("test").getOrCreate()
          import spark.implicits._
          val test = Seq(("New York", "Jack"),
          ("Los Angeles", "Tom"),
          ("Chicago", "David"),
          ("Houston", "John"),
          ("Detroit", "Michael"),
          ("Chicago", "Andrew"),
          ("Detroit", "Peter"),
          ("Detroit", "George")
          ).toDF("city", "name")
          val data = test.as[Employee]
      

      或者

          import spark.implicits._
          val test = Seq(("New York", "Jack"),
            ("Los Angeles", "Tom"),
            ("Chicago", "David"),
            ("Houston", "John"),
            ("Detroit", "Michael"),
            ("Chicago", "Andrew"),
            ("Detroit", "Peter"),
            ("Detroit", "George")
          )
      
          val data = test.map(r => Employee(r._1, r._2)).toDS()
      

      现在您可以groupby 并执行任何聚合

      data.groupBy("city").count().show
      
      data.groupBy("city").agg(collect_list("name")).show
      

      希望这会有所帮助!

      【讨论】:

        【解决方案3】:

        首先我会将你的 RDD 变成一个数据集:

        val spark: org.apache.spark.sql.SparkSession = ???
        import spark.implicits._
        
        val testDs = test.toDS()
        

        在这里你得到你的 col 名称 :) 明智地使用它!

        testDs.schema.fields.foreach(x => println(x))
        

        最后你只需要使用一个groupBy:

        testDs.groupBy("City?", "Name?")
        

        RDD-s 并不是我认为的 2.0 版本。 如果您有任何问题,请尽管问。

        【讨论】:

        • testDs.columns 甚至可以更快地获取不带类型的列名(作为Array[String])。
        • 好点!真的
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-03-30
        • 2018-08-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多