【问题标题】:save updates into dataframe and reuse the saved dataframe in spark scala将更新保存到数据框中并在 spark scala 中重用保存的数据框
【发布时间】:2018-07-19 06:08:06
【问题描述】:

我得到多个传入文件,我必须将每个传入文件与源文件进行比较,然后将旧行合并并替换为新行,如果源文件中存在任何额外的行,则追加。后记我必须使用更新的源文件并与另一个传入的文件进行比较,更新它,这样这个过程就会继续。

到目前为止,我已经为每个文件创建了数据框,并使用 join 进行了比较和合并。我想保存在源文件中完成的所有更新,并再次使用更新后的源文件来比较和更新传入的文件。

          val merge = df.union(dfSource.join(df, Seq( "EmployeeID" ),joinType= "left_anti").orderBy("EmployeeID") )

          merge.write.mode ("append").format("text").insertInto("dfSource")
              merge.show()

我尝试过这种方式,但它不会更新我的 dfSource 数据框。有人可以帮忙吗?

谢谢

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    这种方式是不可能的。需要使用表格,然后作为流程的最后一部分保存到文件中。

    建议您按以下方式调整您的方法 - 这允许并行加载,但我真的怀疑并没有真正的好处。

    1. 按交付顺序加载所有文件,加载的每条记录都带有时间戳或文件序列号中的某些排序顺序以及记录类型。例如。例如,位置为 2 的文件 X 会使用 seqnum = 2 加载记录。如果在 SPARK 域内执行所有操作,您可以对正在处理的文件使用 DF 方法并附加到 Impala / Hive KUDU 表。

    2. 对于同一文件中的记录,如果同一文件中可以存在相同的键,则应用 monotonically_increasing_id() 以获取文件中的排序。见DataFrame-ified zipWithIndex。或者 zipWithIndex 通过 RDD 通过转换并返回到 DF。

    3. 然后发出一个 select 语句来获取具有最大值时间戳的键值,每个键的 seq_num。例如。如果在当前运行 3 个记录中,例如,对于 key=1,只需要处理一个 - 大概是具有最高值的那个。

    4. 另存为新文件。

    5. 相应地处理这个新文件。

    6. 或者:

      绕过第 3 步,按 asc 顺序读取并相应地处理数据。

    评论: 通常,我使用 LOAD 将此类数据加载到 HIVE / IMPALA,并通过从文件名中提取时间戳来设置分区键。需要一些 LINUX 脚本/处理。这是风格问题,不应成为真正的大数据瓶颈。

    这是一个带有模拟输入的 sn-p,它说明了如何完成某些方面以允许针对 UPSerts 的键进行 MAX 选择。操作,DEL,ALT,无论您需要添加什么。尽管我认为您实际上可以根据我所看到的情况自己做到这一点:

    import org.apache.spark.sql.functions._
    import spark.implicits._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._ 
    
    def dfSchema(columnNames: List[String]): StructType =
      StructType(
        Seq(
          StructField(name = "key", dataType = StringType, nullable = false),
          StructField(name = "file", dataType = StringType, nullable = false),
          StructField(name = "ts", dataType = StringType, nullable = false),
          StructField(name = "val", dataType = StringType, nullable = false),
          StructField(name = "seq_val", dataType = LongType, nullable = false)      
        )
      )
    
    val newSchema = dfSchema(List("key", "file", "ts", "val", "seq_val"))
    
    val df1 = Seq(
       ("A","F1", "ts1","1"),
       ("B","F1", "ts1","10"),
       ("A","F1", "ts2","2"),
       ("C","F2", "ts3","8"),
       ("A","F2", "ts3","3"),
       ("A","F0", "ts0","0")  
     ).toDF("key", "file", "ts","val")
    
    val rddWithId = df1.sort($"key", $"ts".asc).rdd.zipWithIndex
    val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    
    dfZippedWithId.show
    

    返回:

    +---+----+---+---+-------+
    |key|file| ts|val|seq_val|
    +---+----+---+---+-------+
    |  A|  F0|ts0|  0|      0|
    |  A|  F1|ts1|  1|      1|
    |  A|  F1|ts2|  2|      2|
    |  A|  F2|ts3|  3|      3|
    |  B|  F1|ts1| 10|      4|
    |  C|  F2|ts3|  8|      5|
    +---+----+---+---+-------+
    

    准备好进行后续处理。

    【讨论】:

    • 请查看电子邮件。
    • 请查看电子邮件或聊天
    • PC 崩溃,稍后将重新输入响应
    猜你喜欢
    • 1970-01-01
    • 2021-09-29
    • 2017-05-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多