【问题标题】:How to Create or convert to a DataSet to a Dataset[Row]如何创建或转换为 DataSet 到 Dataset[Row]
【发布时间】:2020-05-20 14:11:42
【问题描述】:

您好,我正在尝试测试以下以 Dataset[Row] 作为参数的函数

 def getReducedSubsidiaries(dsSubsidiaries: Dataset[Row])(implicit spark: SparkSession): Dataset[SubsidiariesImpacted] = {
    import spark.implicits._
    dsSubsidiaries.as[SubsidiariesImpactedStage]
      .groupByKey(_.subsidiary_uuid)
        .reduceGroups((a, b) => if (a.event_timestamp.compareTo(b.event_timestamp) >= 0) a else b)
        .map(_._2)
        .select(
          $"subsidiary_uuid",
          $"subsidiary_id",
          $"company_uuid"
        )
        .as[SubsidiariesImpacted]
  }

我正在尝试创建一个 DataSet 以通过此函数传入,但我不确定如何将我创建的这个 Dataset 转换为它需要的 Dataset[Row]。


      val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
      val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
      val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")
      val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
        SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
        SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
        SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
      ).toDS()

【问题讨论】:

    标签: scala apache-spark apache-spark-sql dataset apache-spark-dataset


    【解决方案1】:

    Dataframe 是 DataSet[Row] (>spark 2.0)

    所以在作为参数传递给函数之前,只需将 Seq 转换为 DF 而不是 DS。

    case class Person(name: String, age: Int)
    def fun(d:Dataset[Row])=d.show()
    fun(Seq(Person("a", 1)).toDF())
    

    【讨论】:

      【解决方案2】:

      您可以使用Dataset.toDF()

      case class SubsidiariesImpactedStage(t : Timestamp, a: String, b:String, c : Long, d :String )
      
          val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
          val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
          val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")
      
          val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
            SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
            SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
            SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
          ).toDS()
      
          val df = subsidiaries.toDF()
          println(df.getClass)
          df.show()
      

      结果-数据框是 Dataset[Row]

      class org.apache.spark.sql.Dataset
      +-------------------+--------+---------+---+-----------------+
      |                  t|       a|        b|  c|                d|
      +-------------------+--------+---------+---+-----------------+
      |2019-08-01 00:00:00|  active|sub_uuid1| 32|       comp_uuid1|
      |2019-09-20 00:00:00|inactive|sub_uuid1| 32|       comp_uuid1|
      |2019-11-27 00:00:00|  active|sub_uuid1|  5|latest_comp_uuid1|
      +-------------------+--------+---------+---+-----------------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-11-07
        • 2020-10-21
        • 1970-01-01
        • 2017-10-31
        • 1970-01-01
        • 2018-10-02
        • 2018-10-30
        • 1970-01-01
        相关资源
        最近更新 更多