【问题标题】:Scala Spark function with generic Dataset[T] argument and also returns Dataset[T]?具有通用 Dataset[T] 参数并返回 Dataset[T] 的 Scala Spark 函数?
【发布时间】:2021-02-19 03:36:22
【问题描述】:

我了解 Spark 能够将 Dataframe 转换为需要编码器的某个类的 Dataset[T]。但是,我通常可以使用编码器在 main 方法中进行处理,并像这样调用.as[MyClass]

val df = spark.read.parquet("something")
val myDS = df.as[MyClass]

只要为MyClass 定义了编码器,此方法就可以工作

我想创建一个这样的函数

def hello[T](inputDataSet: Dataset[T])(implicit spark: SparkSession): Dataset[T] = {

    val replacedDataFrame = inputDataSet
      // do some transformation as Dataframe
      .as[T]

    replacedDataFrame

}

我还返回 Dataset[T] 。但是,当我尝试投射数据框 .as[T] 时,它会抱怨“未找到隐含”。我只是在想,因为当我传入Dataset[T] 时它能够理解我在做什么,它应该能够理解相反的意思,但我猜不是。有什么办法吗?

示例用例:

// function to replace a column with values from another DataSet
def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession): Dataset[T] = {

    val replacedDataFrame = inputDS
      .join(broadcast(joinable), "col1") // exists in "joinableDS" and "inputDS"
      .withColumnRenamed("col1", "to-drop")
      .withColumnRenamed("col2", "col1") // "col2" exists only in "joinableDS"
      .drop("to-drop") 
      .as[T]

    replacedDataFrame

}

请注意,这不是我唯一的用例。但这里的问题是——我传入了一个Dataset[T],在对其进行了一些操作之后,我也想将返回指定为Dataset[T]。一旦我执行join,它会将Dataset 转换为Dataframe,并且它会忘记定义为T 的类。

【问题讨论】:

  • 不清楚你想要什么。你能显示一些输入和输出吗?将手段从 T 转换为非 T 不是吗?
  • 嘿,我更新了更多上下文!
  • 但这不是它的工作原理
  • 你能澄清一下吗?
  • 我现在看到的更多。但变换意味着数据集 T 往往不再成立

标签: scala apache-spark


【解决方案1】:

试试这个,我很难解释,但它解决了您收到的错误消息:

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.Encoders

case class T(name: String, age: Long)
case class K(name: String, age2: Long)

val dt = Seq(T("Andy", 32), T("John", 33), T("Bob", 33)).toDS()
dt.show()

val dk = Seq(K("Andy", 32), K("John", 133), K("Bob", 245)).toDS()
dk.show()

implicit val sqlContext: SparkSession = spark

def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
//def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K]) : DataFrame = {
    val replacedDataFrame = inputDS
      .join(broadcast(joinableDS), "name")  
      .withColumnRenamed("age", "to-drop")
      .withColumnRenamed("age2", "age")  
      .drop("to-drop") 
      .as[T]
  
    replacedDataFrame
}

val ds = swapColumnValue(dt,dk) 
ds.show(false)

返回:

+----+---+
|name|age|
+----+---+
|Andy| 32|
|John| 33|
| Bob| 33|
+----+---+

+----+----+
|name|age2|
+----+----+
|Andy|  32|
|John| 133|
| Bob| 245|
+----+----+

+----+---+
|name|age|
+----+---+
|Andy|32 |
|John|133|
|Bob |245|
+----+---+

ds 是 T 类型的数据集。

【讨论】:

  • mmm 有趣.. 我不认为像处理 SparkSession 对象那样隐式传入编码器。我认为这是一个很好的答案,感谢您抽出宝贵的时间!
  • 这很难 yakka,因为我知道 scala 真的来自 spark,而不是纯粹的 scala。这是个好问题。
猜你喜欢
  • 2018-03-03
  • 2020-06-05
  • 2018-01-25
  • 1970-01-01
  • 2020-05-23
  • 2017-06-10
  • 2021-12-23
  • 2020-10-15
  • 2016-11-30
相关资源
最近更新 更多