【问题标题】:How to process new files in HDFS directory once their writing has eventually finished?写入完成后如何处理 HDFS 目录中的新文件?
【发布时间】:2017-11-06 15:25:06
【问题描述】:

在我的场景中,我将 CSV 文件不断上传到 HDFS。

上传新文件后,我想使用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为 parquet)。即我在每个输入文件和转换/处理的输出文件之间有一个一对一的映射。

我正在评估 Spark Streaming 以侦听 HDFS 目录,然后使用 Spark 处理“流文件”。

但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以保留文件之间的端到端一对一映射。

如何转换整个文件而不是它的微批次?

据我所知,Spark Streaming 只能将转换应用于批处理(DStreams 映射到 RDDs),而不能一次应用于整个文件(当其有限流完成时)。

正确吗?如果是这样,我应该为我的方案考虑什么替代方案?

【问题讨论】:

  • 一个文件在被 Spark Streaming 拾取之前被完全写入 HDFS,所以我不明白这个问题
  • @cricket_007 你能澄清一下你的意思吗?

标签: apache-spark hadoop hdfs spark-structured-streaming


【解决方案1】:

您可以使用 DFSInotifyEventInputStream 观察 Hadoop 目录,然后在创建文件时以编程方式执行 Spark 作业。

看到这个帖子: HDFS file watcher

【讨论】:

  • Spark Streaming 可以观看文件夹。那个类是没有必要的
  • 如何在 Spark Streaming 中逐个文件处理?如果同时写入两个文件会怎样?
  • 逐个文件是什么意思? Spark Streaming 会提取所有自动移动到目标目录的文件,如文档中所述(在其他答案中复制),因此两个文件被视为两个单独的记录
  • 阅读问题。用户想为一个源文件创建一个目标文件。
  • 好的,再一次,您的答案没有利用 Spark Streaming 的内置功能,而是建议使用普通的 HDFS 库
【解决方案2】:

第一次尝试我可能误解了你的问题...

据我所知,Spark Streaming 只能将转换应用于批处理(DStreams 映射到 RDD),而不能一次应用于整个文件(当其有限流完成时)。

对吗?

没有。这正确。

Spark Streaming 将立即对整个文件应用转换,就像在 Spark Streaming 的批处理间隔过去时写入 HDFS 一样。

Spark Streaming 将获取文件的当前内容并开始处理它。


上传新文件后,我需要使用 Spark/SparkSQL 处理新文件

几乎使用 Spark 是不可能的,因为它的架构从“上传”和 Spark 处理它的那一刻起需要一些时间。

您应该考虑使用全新的闪亮Structured Streaming 或(即将过时的)Spark Streaming

两种解决方案都支持监视新文件的目录并在新文件上传后触发 Spark 作业(这正是您的用例)。

引用结构化流的Input Sources

在 Spark 2.0 中,有一些内置的源代码。

  • 文件源 - 将写入目录中的文件作为数据流读取。支持的文件格式为文本、csv、json、parquet。请参阅 DataStreamReader 接口的文档以获取最新列表以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现。

另请参阅 Spark Streaming 的 Basic Sources

除了套接字之外,StreamingContext API 还提供了从作为输入源的文件创建 DStream 的方法。

文件流:为了从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以将 DStream 创建为: p>

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 将监视目录 dataDirectory 并处理在该目录中创建的任何文件(不支持写入嵌套目录中的文件)。

考虑到您的要求,有一个警告:

我需要知道“文件流”何时完成。

不要对 Spark 执行此操作。

再次引用 Spark Streaming 的 Basic Sources

  • 必须在 dataDirectory 中创建文件,方法是自动将它们移动或重命名到数据目录中。

  • 文件一经移动,不得更改。所以如果文件被连续追加,新数据将不会被读取。

结束...您应该在文件完成并准备好使用 Spark 处理时将文件移动到 Spark 监视的目录。这超出了 Spark 的范围。

【讨论】:

  • 感谢您的回答,顺便说一句,我需要说明我问题的关键点。如何转换整个文件而不是它的微批次?这就是我写 [quote] 我需要知道“文件流”何时完成的原因。我需要将转换应用于整个文件,以保留文件之间的端到端一对一映射。
  • @Andrea 您需要弄清楚是什么决定了整个文件。 HDFS 不识别“文件流”。写入其中的任何文件的每个“部分”都将被识别为整个文件
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-20
  • 2014-03-19
  • 2019-06-15
相关资源
最近更新 更多