【问题标题】:Spark Streaming: HDFS火花流:HDFS
【发布时间】:2015-05-15 08:29:01
【问题描述】:
  1. 我无法让我的 Spark 作业从 HDFS 流式传输“旧”文件。

如果我的 Spark 作业由于某种原因(例如演示、部署)而关闭,但写入/移动到 HDFS 目录的过程是连续的,我可能会在启动 Spark Streaming 作业后跳过这些文件。

    val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )

输出 --> 本批次记录数:0

  1. Spark Streaming 有没有办法将“读取”文件移动到不同的文件夹?还是我们必须手动编程?所以它会避免读取已经“读取”的文件。

  2. Spark Streaming 是否与在 CRON 中运行 spark 作业 (sc.textFile) 相同?

【问题讨论】:

    标签: hadoop apache-spark hdfs spark-streaming


    【解决方案1】:

    正如 Dean 提到的,textFileStream 使用默认设置,即仅使用新文件。

      def textFileStream(directory: String): DStream[String] = {
        fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
      }
    

    所以,它所做的只是调用fileStream 的这个变体

    def fileStream[
        K: ClassTag,
        V: ClassTag,
        F <: NewInputFormat[K, V]: ClassTag
      ] (directory: String): InputDStream[(K, V)] = {
        new FileInputDStream[K, V, F](this, directory)
      }
    

    而且,查看FileInputDStream 类,我们会发现它确实可以查找现有文件,但默认为仅新文件:

    newFilesOnly: Boolean = true,
    

    所以,回到StreamingContext 代码,我们可以看到有和我们可以通过直接调用fileStream 方法来使用重载:

    def fileStream[
     K: ClassTag,
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag] 
    (directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
      new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    }
    

    所以,TL;DR;是

    ssc.fileStream[LongWritable, Text, TextInputFormat]
        (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
    

    【讨论】:

    • 嗨贾斯汀,谢谢。但我只能读取在运行 Spark 作业之前最后 5 秒放在那里的旧文件。有我需要更新的设置吗?这是日志的一部分,我不确定它是否与它有关:INFO dstream.FileInputDStream: Slide time = 10000 ms 这是我的新代码:val hdfsDStream = ssc.fileStream[LongWritable, Text, TextInputFormat ]("hdfs://sandbox.hortonworks.com/user/root/logs", (t: Path) => true, newFilesOnly=false).map(_._2.toString)
    • @sophie 我相当肯定这是一个错误。然而,深入研究它......看起来一个天真的修复会破坏很多。要点是他们正在做一个max,他们应该在哪里做一个min....但我认为如果你只修复那部分文件可能会一遍又一遍地处理......我会提交错误这个周末的某个时候并发布它。
    • 嗨贾斯汀,你知道这是否已经解决了吗?我仍然无法让它工作。
    • 我仍然怀疑这正是这个问题,但issues.apache.org/jira/browse/SPARK-6061
    【解决方案2】:

    您是否希望 Spark 读取目录中已有的文件?如果是这样,这是一个常见的误解,让我大吃一惊。 textFileStream 监视 new 文件出现的目录,然后读取它们。它会在您启动时忽略目录中已经存在的文件或已经读取的文件。

    基本原理是您将有一些进程将文件写入 HDFS,然后您会希望 Spark 读取它们。请注意,这些文件大多以原子方式出现,例如,它们被慢慢写入其他地方,然后移动到监视目录。这是因为 HDFS 不能同时正确处理读取和写入文件。

    【讨论】:

      【解决方案3】:
      val filterF = new Function[Path, Boolean] {
          def apply(x: Path): Boolean = {
            println("looking if "+x+" to be consider or not")
            val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
             else{ false }
            return flag
          }
      }
      

      此过滤器功能用于确定每条路径是否实际上是您首选的路径。因此应根据您的要求自定义应用内的功能。

      val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}
      

      现在您必须将 filestream 函数的第三个变量设置为 false,这是为了确保不仅是新文件,而且还要考虑流目录中的旧现有文件。

      【讨论】:

      • 函数filterF中没有定义list
      猜你喜欢
      • 2016-10-12
      • 1970-01-01
      • 2018-08-20
      • 2017-04-27
      • 2019-08-13
      • 2019-04-02
      • 2016-02-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多