【发布时间】:2020-04-14 14:51:06
【问题描述】:
用例
我的数据是作为数据帧写入的,我想检查两个具有完全相同架构的数据帧是否相等。具体来说,要检查每个 id 值是否来自第一个和第二个数据帧的记录相同。换句话说,假设每个数据帧的每个 id 都有一条记录,我希望将数据帧 1 和数据帧 2 的行之间的每个 id 的差异并列。
我的假设是我需要实现一个新的数据帧(即通过连接操作),以便使用 Spark 大规模执行此操作。到目前为止,我的假设是否正确?
这是目前为止的代码:
val postsFromDF1: Dataset[Post] = ... // dataframe read as a Dataset of Scala Objects
val postsFromDF2: Dataset[Post] = ... // dataframe read as a Dataset of Scala Objects
val joined: DataFrame = postsFromDF1.as("df1").join(postsFromDF2.as("df2"), usingColumn = "id")
现在我想列出那些值不相同的 id 匹配对象之间的所有差异(当然,它们所连接的共享 id 字段除外)。因为其中一些值本身就是对象集合——对我来说,使用 scala 对象的对象树似乎比在此连接后切换到列名级别的工作更具可读性或本能。评论到现在?这是使用 Spark 的好方法吗?
我的最后一个问题
我如何才能为连接的每一行返回一个对象表示对(每个原始数据帧对象一个对象),同时在比较对象时仍然享受 Spark 的并行性?
这样的对象表示:
case class PostPair(post: Post, otherPost: Post, id: String)
我尝试了什么
我尝试锤击这个实验性代码,但它在运行时失败;可能 Encoders.product 隐含的描述性不够。
case class PostPair(post: Post, otherPost: Post, id: String)
implicit val encoder = Encoders.product[PostPair]
val joined: Dataset[PostPair] =
postsFromDF1.as("df1")
.join(postsFromDF2.as("df2"), usingColumn = "id")
.as[PostPair]
其他信息
以下是我如何从每个数据帧中分离地完成案例类的集合:
case class PostsParquetReader(spark: SparkSession) {
/** default method applied when the object is called */
def apply(path: String) = {
val df = spark.read.parquet(path)
toCaseClass(spark, df)
}
/** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
private def toCaseClass(spark : SparkSession, idf : DataFrame) = {
import spark.implicits._
idf.as[Post].flatMap(record => {
Iterator[Post](record)
})
}
}
我觉得在 join 之后使用相同的对象强制方法可能会很麻烦,或者这种对象强制方法可能在 Spark 并行/分布式执行方面有其缺点。
另一方面,通过对象记录进行(编码)比较和显示差异就像数据是简单的 Scala 对象树似乎是最易读和最灵活的方法 - 因为它可以Scala 集合 API 的标准杠杆作用。
【问题讨论】:
-
谢谢。在第一次和第二次阅读中,我主要从中学到的是 Spark 和面向对象的范式在 Spark 的设计使用方式上存在阻抗不匹配。你会同意吗?我将详细介绍那里描述的所有内容。
-
我不会将对该问题的任何答案视为直接(或全面)的答案。我的问题也更加明确,涉及到对象的工作是否与 Spark 的并发和分布架构配合得很好,希望对其特定用例提供全面的答案。
-
您想要的结果数据框的输出/模式到底是什么?我觉得你可以使用
except函数来达到目标,但也许你想生成一个特定的新数据框 -
@BinziCao 我想用这样的东西作为结果类型:
case class PostPair(post: Post, otherPost: Post, id: String)
标签: scala dataframe apache-spark apache-spark-sql