【问题标题】:How to update HDFS files after every trigger?每次触发后如何更新 HDFS 文件?
【发布时间】:2019-01-12 15:48:56
【问题描述】:

我正在尝试将处理后的数据写入 HDFS 位置。经过长时间的试验和错误方法后,我正在将数据写入 HDFS 位置,但现在的问题是每当我将新文件添加到我的目录(我指向 readStream 的位置)时,旧文件在 HDFS 位置得到处理和更新

假设我已经开始流式传输并且我在我的目录中添加了 file-1.csv...没有写入 HDFS 位置..然后我添加了 file-2.csv..仍然没有写入 HDFS ,接下来我添加了file-3.csv..这次file-1.csv的处理文件正在写入HDFS...

File 1 - no prcess
File 2 - no process
File 3 - process and written file 1 data to HDFS
file4 - process and written file 2 data to HDFS 

我不确定为什么会发生这种情况,即使它是结构化流媒体

有人可以解决这个问题吗?

我的输出命令如下:

FetRepo
  .writeStream
  .outputMode("append")
  .partitionBy("data_dt")
  .format("csv")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .option("path", "hdfs://ffff/apps/hive/warehouse/area.db/fin_repo/")
  .start

【问题讨论】:

  • HDFS 在这里发挥作用吗?如果您使用常规文件系统,这是否有效?这是什么 Spark 版本?您如何运行 Spark 应用程序?
  • 数据最终会被写入 - 还是不会?
  • @Jack Laskowski 是的,HDFS 发挥作用.. 我在 HDFS 之上创建了 HIVE 表。 spark 2.3.0 .. 我已经通过 spark-shell
  • @thebluephantom 数据最终写入,但正如我所说,每当我添加新文件时它不会写入数据.. 总是有 2 个或文件滞后...当我摄取文件 3 时,文件 1 得到处理和作为分区插入HDFS
  • 我观察到的也是这样。我不会担心的。

标签: apache-spark hdfs spark-structured-streaming


【解决方案1】:

问问自己“我多久添加一次文件?”以及这与 Trigger.ProcessingTime("10 seconds")?使用该配置,您不应期望在 10 秒内发生任何事情。

要注意的另一件事是,您使用 outputMode("append") 只会输出自上次触发以来添加的聚合(组)的行。

来自Basic Concepts

追加模式 - 只有自上次触发后追加到结果表中的新行才会写入外部存储。

请注意(引用同一文档):“这仅适用于结果表中的现有行预计不会更改的查询。”

【讨论】:

  • 是的......它也不会在 30 秒后发生......我的数据也是新鲜的,我插入数据的 hdfs 位置也很干净......我可以在每次之后看到这种插入模式我的 readStream 目录中添加了 3 个文件 .... 是否有关于 spark 结构化流上的文件数量的任何设置/限制?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-06
  • 2020-04-19
  • 1970-01-01
相关资源
最近更新 更多