【问题标题】:Working with scala objects following a Spark join在 Spark 连接之后使用 scala 对象
【发布时间】:2020-04-14 14:51:06
【问题描述】:

用例

我的数据是作为数据帧写入的,我想检查两个具有完全相同架构的数据帧是否相等。具体来说,要检查每个 id 值是否来自第一个和第二个数据帧的记录相同。换句话说,假设每个数据帧的每个 id 都有一条记录,我希望将数据帧 1 和数据帧 2 的行之间的每个 id 的差异并列。

我的假设是我需要实现一个新的数据帧(即通过连接操作),以便使用 Spark 大规模执行此操作。到目前为止,我的假设是否正确?

这是目前为止的代码:

  val postsFromDF1: Dataset[Post] = ... // dataframe read as a Dataset of Scala Objects
  val postsFromDF2: Dataset[Post] = ... // dataframe read as a Dataset of Scala Objects

  val joined: DataFrame = postsFromDF1.as("df1").join(postsFromDF2.as("df2"), usingColumn = "id")

现在我想列出那些值​​不相同的 id 匹配对象之间的所有差异(当然,它们所连接的共享 id 字段除外)。因为其中一些值本身就是对象集合——对我来说,使用 scala 对象的对象树似乎比在此连接后切换到列名级别的工作更具可读性或本能。评论到现在?这是使用 Spark 的好方法吗?

我的最后一个问题

我如何才能为连接的每一行返回一个对象表示对(每个原始数据帧对象一个对象),同时在比较对象时仍然享受 Spark 的并行性?

这样的对象表示:

case class PostPair(post: Post, otherPost: Post, id: String)

我尝试了什么

我尝试锤击这个实验性代码,但它在运行时失败;可能 Encoders.product 隐含的描述性不够。

  case class PostPair(post: Post, otherPost: Post, id: String)
  implicit val encoder = Encoders.product[PostPair]

  val joined: Dataset[PostPair] =
    postsFromDF1.as("df1")
      .join(postsFromDF2.as("df2"), usingColumn = "id")
      .as[PostPair]

其他信息

以下是我如何从每个数据帧中分离地完成案例类的集合:

case class PostsParquetReader(spark: SparkSession) {
  /** default method applied when the object is called */
  def apply(path: String) = {
    val df = spark.read.parquet(path)
    toCaseClass(spark, df)
  }

  /** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
  private def toCaseClass(spark : SparkSession, idf : DataFrame)  = {
    import spark.implicits._
    idf.as[Post].flatMap(record => {
      Iterator[Post](record)
    })
  }
}

我觉得在 join 之后使用相同的对象强制方法可能会很麻烦,或者这种对象强制方法可能在 Spark 并行/分布式执行方面有其缺点。

另一方面,通过对象记录进行(编码)比较和显示差异就像数据是简单的 Scala 对象树似乎是最易读和最灵活的方法 - 因为它可以Scala 集合 API 的标准杠杆作用。

【问题讨论】:

  • 这能回答你的问题吗? Perform a typed join in Scala with Spark Datasets
  • 谢谢。在第一次和第二次阅读中,我主要从中学到的是 Spark 和面向对象的范式在 Spark 的设计使用方式上存在阻抗不匹配。你会同意吗?我将详细介绍那里描述的所有内容。
  • 我不会将对该问题的任何答案视为直接(或全面)的答案。我的问题也更加明确,涉及到对象的工作是否与 Spark 的并发和分布架构配合得很好,希望对其特定用例提供全面的答案。
  • 您想要的结果数据框的输出/模式到底是什么?我觉得你可以使用except 函数来达到目标​​,但也许你想生成一个特定的新数据框
  • @BinziCao 我想用这样的东西作为结果类型:case class PostPair(post: Post, otherPost: Post, id: String)

标签: scala dataframe apache-spark apache-spark-sql


【解决方案1】:

@Binzi 的解决方案可以工作,但需要一些改进。

@matanster 您的方法很好,您可以使用 DataSet API 而不是 DataFrame API。 DataSet API 由 Scala 案例类支持,使复杂的操作更容易。与 DataFrame 相比,可扩展性相同,但性能略逊一筹。对于复杂的操作,我总是更喜欢 DataSet。

您可以直接从原始数据创建数据集,无需自己编写 toCaseClass(spark, df)。案例类架构必须与您的数据架构匹配。

    case class post ()//define all properties

    val spark : SparkSession = SparkSession.builder
      .appName("name")
      .master("local[2]")
      .getOrCreate()

    import spark.implicits._
    val postsFromDF1: Dataset[Post] = spark.read.parquet(path).as[post]
    val postsFromDF2: Dataset[Post] = spark.read.parquet(path).as[post]

    val joinedDs = postsFromDF1.joinWith(postsFromDF2)

joinedDs 是一个元组(post,post)。然后你可以在这个元组上应用逻辑并在其中发布对象(如你所想)。

性能

Datset 需要将整个数据解码成对象后才能进行操作。从本质上讲,它不能从列式存储中受益。但是在 Dataframe 中,您只能读取几列并对其进行操作,因为 parquet 是柱状的,因此可以节省大量时间来避免读取所有这些列。除此之外,我没有任何其他性能差异。可扩展性完全相同。

【讨论】:

  • 天哪,谢谢。很高兴知道这一切,多么完整的答案。
【解决方案2】:

这似乎是你想要的:

  import org.apache.spark.sql.functions._
  case class Post(id:Int, name: String, age: Long)
  case class PostPair(post: Post, otherPost: Post, id: String)

  val tom = Post(1,"Tom",37)
  val sam = Post(2, "Sam",40)
  val sam2 = Post(2, "Sam",41)

  val postsFromDF1 = List(tom, sam).toDS
  val postsFromDF2 = List(tom, sam2).toDS

  val columns  = struct(postsFromDF1.columns.map(col(_)):_*)

  val result = postsFromDF1.except(postsFromDF2).
    select(
      columns.alias("post"),
      col("id")
    ).
    join(
      postsFromDF2.select(
        columns.alias("otherPost"),
        col("id")
      ),
      "id"
    ).as[PostPair]


  result.show()

+---+------------+------------+
| id|        post|   otherPost|
+---+------------+------------+
|  2|[2, Sam, 40]|[2, Sam, 41]|
+---+------------+------------+

【讨论】:

  • my answer相比,您认为可扩展性有什么根本区别吗?
  • @matanster 根据你原来的要求,1.使用except会过滤掉两者之间不匹配的记录,所以没有进一步的fitler。 2. filter 代码受任何模式更新的影响,而except 处理任何模式更新。 3.通过except生成的数据框会比joined小。你可以缓存它
【解决方案3】:

我发现joinWith 只是为客户端代码保留了对象语义,而不需要任何额外的用户代码。这个 Spark API 函数只负责无缝地启用原始对象类型的使用,至少在简单连接的情况下是这样。在我的代码示例中,它只产生一个Dataset[(Post, Post)]

val joined =
  postsFromDF1.joinWith(
    postsFromDF2,
    postsFromDF1.col("id") === postsFromDataframe2.col("id"))

结果是一个二元组的集合,可以相应地使用,例如

joined.filter( pair => pair._1.someField != pair._2.someField )

任何关于可扩展性和内存消耗的 cmets欢迎使用其他解决方案。

【讨论】:

  • 我经常使用数据集,并且对可扩展性没有任何问题。与数据框相比,性能可能会稍低,但我将始终使用数据集
猜你喜欢
  • 2017-04-10
  • 1970-01-01
  • 2020-09-01
  • 1970-01-01
  • 1970-01-01
  • 2021-03-22
  • 2019-01-27
  • 1970-01-01
  • 2019-02-12
相关资源
最近更新 更多