【发布时间】:2020-05-20 14:11:42
【问题描述】:
您好,我正在尝试测试以下以 Dataset[Row] 作为参数的函数
def getReducedSubsidiaries(dsSubsidiaries: Dataset[Row])(implicit spark: SparkSession): Dataset[SubsidiariesImpacted] = {
import spark.implicits._
dsSubsidiaries.as[SubsidiariesImpactedStage]
.groupByKey(_.subsidiary_uuid)
.reduceGroups((a, b) => if (a.event_timestamp.compareTo(b.event_timestamp) >= 0) a else b)
.map(_._2)
.select(
$"subsidiary_uuid",
$"subsidiary_id",
$"company_uuid"
)
.as[SubsidiariesImpacted]
}
我正在尝试创建一个 DataSet 以通过此函数传入,但我不确定如何将我创建的这个 Dataset 转换为它需要的 Dataset[Row]。
val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")
val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
).toDS()
【问题讨论】:
标签: scala apache-spark apache-spark-sql dataset apache-spark-dataset