【问题标题】:Spark Streaming: StreamingContext doesn't read data filesSpark Streaming:StreamingContext 不读取数据文件
【发布时间】:2015-03-14 06:38:46
【问题描述】:

我是 Spark Streaming 的新手,我正在尝试使用 Spark-shell 开始使用它。 假设我在 spark-1.2.0-bin-hadoop2.4 的根目录中有一个名为“dataTest”的目录。

我想在 shell 中测试的简单代码是(在输入 $.\bin\spark-shell 之后):

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
val data = ssc.textFileStream("dataTest")
println("Nb lines is equal to= "+data.count())
data.foreachRDD { (rdd, time) => println(rdd.count()) }
ssc.start()
ssc.awaitTermination()

然后,我复制了目录“dataTest”中的一些文件(并且我尝试重命名该目录中的一些现有文件)。

但不幸的是,我没有得到我想要的(即我没有得到任何输出,所以 ssc.textFileStream 似乎无法正常工作),只是一些类似的东西:

15/01/15 19:32:46 INFO JobScheduler: Added jobs for time 1421346766000 ms
15/01/15 19:32:46 INFO JobScheduler: Starting job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO SparkContext: Starting job: foreachRDD at <console>:20
15/01/15 19:32:46 INFO DAGScheduler: Job 69 finished: foreachRDD at <console>:20
, took 0,000021 s
0
15/01/15 19:32:46 INFO JobScheduler: Finished job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO MappedRDD: Removing RDD 137 from persistence list
15/01/15 19:32:46 INFO JobScheduler: Total delay: 0,005 s for time 1421346766000
ms (execution: 0,002 s)
15/01/15 19:32:46 INFO BlockManager: Removing RDD 137
15/01/15 19:32:46 INFO UnionRDD: Removing RDD 78 from persistence list
15/01/15 19:32:46 INFO BlockManager: Removing RDD 78
15/01/15 19:32:46 INFO FileInputDStream: Cleared 1 old files that were older tha
n 1421346706000 ms: 1421346704000 ms
15/01/15 19:32:46 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

【问题讨论】:

    标签: scala spark-streaming


    【解决方案1】:

    您是否尝试将文本文件从另一个目录移动到被监视的目录中?为了使文件流正常工作,您可以自动将文件放入受监视的目录中,这样只要文件在列表中可见,Spark 就可以读取文件中的所有数据(如果您正在复制,则可能不是这种情况)文件到目录中)。

    这在programming guide 的基本来源小节中有详细记录

    【讨论】:

    • 我尝试移动、重命名文本文件,但遗憾的是我没有得到任何输出!对你有用吗?
    • 我还必须确保格式正确 - 我犯了将文本文件保存为 .txt.gz 格式的错误,这不起作用。一旦我将它保存为纯文本,它就可以工作了
    【解决方案2】:

    使用命令行复制文件/文档或将文件/文档另存为目录为我工作。 当您通常复制(通过 IDE)时,这不会影响修改日期作为流上下文监控修改日期。

    【讨论】:

      【解决方案3】:

      我也遇到过同样的问题,对我来说,解决方案是 流式传输正在运行,我编辑并保存了我想用作输入流的文件。然后我直接将输入文件移动到流目录,同时流仍然打开。

      【讨论】:

        【解决方案4】:

        以下代码对我有用

        class StreamingData extends Serializable {
          def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
            //val sc = new SparkContext(conf)
            val ssc = new StreamingContext(conf, Seconds(1))
            val input = ssc.textFileStream("file:///C:/Users/M1026352/Desktop/Spark/StreamInput")
            val lines = input.flatMap(_.split(" "))
            val words = lines.map(word => (word, 1))
            val counts = words.reduceByKey(_ + _)
            counts.print()
            ssc.start()
            ssc.awaitTermination()
          }
        
        }
        

        只需要保留 Unix 格式的文本文件 如果您在记事本++中打开文件,请转到 设置->首选项->新建文档->Unix/OSX 然后更改文件名以让 Scala 选择它。 https://stackoverflow.com/a/41495776/5196927

        参考上面的链接。

        【讨论】:

          【解决方案5】:

          我认为这通常应该有效,但是问题可能是建议将 Spark Streaming 作为独立应用程序而不是在 spark-shell 中执行。

          我将它作为一个独立的应用程序运行(在其他流数据上)并且它工作正常。

          data.count() 为您提供 DStream 的每个 RDD 中有多少元素,这与您在 foreachRDD() 中计数的元素相同。

          【讨论】:

          • 你测试过什么样的流数据?它是否适用于您光盘上存在的文本文件?感谢回复
          • 我之前用 twitter 流测试过。今天,我使用 textFileStream 测试了一个文件,就像您在示例中一样。我无法使用我的 macbook 上的 spark-shell 让它工作。我决定尝试使用 nfs 文件系统,以防 macbook 文件系统与 hdfs 不兼容。我仍然与本地大师一起使用 spark-shell。我将批量大小从 2 增加到 10,以便更容易看到喷出的日志行。只要我从同一文件系统中的另一个目录(不是“cp”)中“mv”文件,该代码就可以在 nfs 上运行。来自同一目录的 mv 不起作用。
          【解决方案6】:

          我正在做几乎相同的事情(作为 Windows 8 笔记本电脑上的独立应用程序),对我来说这很好,但是我将“dataTest”文件夹作为“bin”的子文件夹。也许试试看?

          【讨论】:

          • 最好在评论区发表这类回复
          【解决方案7】:

          我只是在 shell 中使用你的代码,它工作正常。当我将一些文件放入目录(HDFS)时,我得到了这样的输出日志:

          15/07/23 10:46:36 INFO dstream.FileInputDStream: Finding new files took 9 ms
          15/07/23 10:46:36 INFO dstream.FileInputDStream: New files at time 1437619596000 ms:
          hdfs://master:9000/user/jared/input/hadoop-env.sh
          15/07/23 10:46:36 INFO storage.MemoryStore: ensureFreeSpace(235504) called with curMem=0, maxMem=280248975
          ......
          15/07/23 10:46:36 INFO input.FileInputFormat: Total input paths to process : 1
          15/07/23 10:46:37 INFO rdd.NewHadoopRDD: Input split: hdfs://master:9000/user/jared/input/hadoop-env.sh:0+4387
          15/07/23 10:46:42 INFO dstream.FileInputDStream: Finding new files took 107 ms
          15/07/23 10:46:42 INFO dstream.FileInputDStream: New files at time 1437619598000 ms:
          
          15/07/23 10:46:42 INFO scheduler.JobScheduler: Added jobs for time 1437619598000 ms
          15/07/23 10:46:42 INFO dstream.FileInputDStream: Finding new files took 23 ms
          15/07/23 10:46:42 INFO dstream.FileInputDStream: New files at time 1437619600000 ms:
          
          15/07/23 10:46:42 INFO scheduler.JobScheduler: Added jobs for time 1437619600000 ms
          15/07/23 10:46:43 INFO dstream.FileInputDStream: Finding new files took 42 ms
          15/07/23 10:46:43 INFO dstream.FileInputDStream: New files at time 1437619602000 ms:
          15/07/23 10:46:43 INFO scheduler.JobScheduler: Added jobs for time 1437619602000 ms
          15/07/23 10:46:43 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1830 bytes result sent to driver
          15/07/23 10:46:43 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6098 ms on localhost (1/1)
          15/07/23 10:46:43 INFO scheduler.DAGScheduler: ResultStage 0 (foreachRDD at <console>:29) finished in 6.178 s
          15/07/23 10:46:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
          15/07/23 10:46:43 INFO scheduler.DAGScheduler: Job 66 finished: foreachRDD at <console>:29, took 6.647137 s
          101
          

          【讨论】:

            猜你喜欢
            • 2018-02-14
            • 2019-05-11
            • 2021-02-05
            • 2017-07-14
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2021-04-17
            • 1970-01-01
            相关资源
            最近更新 更多