【问题标题】:Scala Spark loop goes through without any error, but does not produce an outputScala Spark 循环通过没有任何错误,但不产生输出
【发布时间】:2017-11-08 20:49:40
【问题描述】:

我在 HDFS 中有一个文件,其中包含各种其他文件的路径。这是名为 file1 的文件:

path/of/HDFS/fileA
path/of/HDFS/fileB
path/of/HDFS/fileC
.
.
.

我在 Scala Spark 中使用如下的 for 循环来读取上述文件的每一行并在另一个函数中处理它:

val lines=Source.fromFile("path/to/file1.txt").getLines.toList

for(i<-lines){
i.toString()
val firstLines=sc.hadoopFile(i,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}
}

当我运行上述循环时,它会运行而不会返回任何错误,并且我会在新行中得到 Scala 提示:scala>

但是,当我尝试查看应该存储在 firstLines 中的几行输出时,它不起作用:

scala> firstLines
<console>:38: error: not found: value firstLines
          firstLine
          ^

上面的循环没有产生输出,但是运行没有任何错误的问题是什么?

其他信息 函数 hadoopFile 接受一个字符串路径名作为它的第一个参数。这就是为什么我试图将 file1 的每一行(每一行是一个路径名)作为第一个参数 i 中的字符串传递。 flatMap 功能正在获取已传递给 hadoopFile 的文件的第一行并将其单独存储并转储所有其他行。因此,所需的输出(firstLines)应该是通过路径名(i)传递给 hadoopFile 的所有文件的第一行。

我尝试只为单个文件运行该函数,没有循环,这会产生输出:

val firstLines=sc.hadoopFile("path/of/HDFS/fileA",classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

scala> firstLines.take(3)
res27: Array[String] = Array(<?xml version="1.0" encoding="utf-8"?>)

fileA 是一个 XML 文件,因此您可以看到该文件的第一行结果。所以我知道该函数工作正常,这只是我无法弄清楚的循环问题。请帮忙。

【问题讨论】:

    标签: scala hadoop apache-spark hdfs


    【解决方案1】:

    变量firstLines 是在for 循环体中定义的,因此其范围仅限于该循环。这意味着您无法访问循环外的变量,这就是 Scala 编译器告诉您error: not found: value firstLines 的原因。

    根据您的描述,我了解到您想收集lines 中列出的每个文件的第一行。

    这里的 every 可以在 Scala 中翻译成不同的结构。我们可以使用您编写的for 循环之类的东西,甚至更好地采用函数式方法并使用应用于文件列表的map 函数。在下面的代码中,我将您在描述中使用的代码放入 map 中,这将创建一个 HadoopRDD 并将 flatMap 与您的函数一起应用以检索文件的第一行。

    然后我们获得RDD[String] 的行列表。在这个阶段,请注意我们还没有开始做任何实际的工作。要触发对 RDD 的评估并收集结果,我们需要对列表中的每个 RDD 调用 collect 方法。

    // Renamed "lines" to "files" as it is more explicit.  
    val fileNames = Source.fromFile("path/to/file1.txt").getLines.toList
    
    val firstLinesRDDs = fileNames.map(sc.hadoopFile(_,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
      case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
    })
    
    // firstLinesRDDs is a list of RDD[String]. Based on this code, each RDD
    // should consist in a single String value. We collect them using RDD#collect:
    val firstLines = firstLinesRDDs.map(_.collect)
    

    但是,这种方法存在一个缺陷,使我们无法从 Spark 可以提供的任何优势中受益。

    当我们将map 中的操作应用到filenames 时,我们没有使用 RDD,因此文件名在驱动程序(托管 Spark 会话的进程)上按顺序处理,而不是可并行化的一部分火花作业。这相当于执行您在第二个代码块中编写的内容,一次一个文件名。

    为了解决这个问题,我们能做些什么?使用 Spark 时要记住的一件好事是尝试在我们的代码中尽早推送 RDD 的声明。为什么?因为这允许 Spark 并行化和优化我们想做的工作。您的示例可能是该概念的教科书说明,尽管此处需要操作文件增加了额外的复杂性。

    在我们目前的例子中,我们可以从hadoopFile 接受逗号分隔的输入文件这一事实中受益。因此,我们不是为每个文件按顺序创建 RDD,而是为所有文件创建一个 RDD:

    val firstLinesRDD = sc.hadoopFile(fileNames.mkString(","), classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
      case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
    }
    

    我们用一个collect 检索我们的第一行:

    val firstLines = firstLinesRDD.collect
    

    【讨论】:

    • 解释的很清楚很详细!我现在明白为什么 for 循环不起作用了。并使用您的解决方案,我能够实现结果。非常感谢!
    • 我很高兴它有帮助!谢谢
    猜你喜欢
    • 2011-08-27
    • 2018-06-04
    • 1970-01-01
    • 1970-01-01
    • 2019-10-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-23
    相关资源
    最近更新 更多