【问题标题】:how to monitor multi directories in spark streaming task如何在火花流任务中监控多个目录
【发布时间】:2015-05-13 19:09:31
【问题描述】:

我想在spark streaming中使用fileStream来监控多个hdfs目录,比如:

val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/root/*/*", check_valid_file(_), false).map(_._2.toString).print

顺便说一句,我下不了三个类的意思:LongWritable、Text、TextInputFormat

但它不起作用......

java.io.FileNotFoundException: File /user/root/*/*
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
    at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
    at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
    at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
    at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:84)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    来自 Spark 文档:

    def fileStream[K, V, F <: InputFormat[K, V]](directory: String, filter: (Path) ⇒ Boolean, newFilesOnly: Boolean, conf: Configuration)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]
    

    创建一个输入流,用于监控与 Hadoop 兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。必须通过将文件从同一文件系统中的另一个位置“移动”它们来将文件写入受监视的目录。以 . 开头的文件名被忽略。

    K    Key type for reading HDFS file
    V    Value type for reading HDFS file
    F    Input format for reading HDFS file 
    directory    HDFS directory to monitor for new file
    filter    Function to filter paths to process
    newFilesOnly Should process only new files and ignore existing files in the directory
    conf    Hadoop configuration
    

    filter 参数采用一种方法来识别要处理的输入文件。

    new Function<Path, Boolean>() {
            @Override
            public Boolean call(Path v1) throws Exception {
              return Boolean.TRUE;
            }
          }
    

    TextInputFormat 是默认输入格式,用于纯文本或压缩文本文件。

    文件被分成几行。 Keys 是文件中的位置(偏移量),values 是文本行。

    ssc.fileStream[LongWritable, Text, TextInputFormat] 的工作方式与ssc.textFileStream(directory) 完全相同

    如果你想自定义你的文件读取过程,那么你需要定义一个自定义输入格式并指定从返回的键值对是什么

    要实现自定义输入格式,您可以参考定义 Hadoop mapreduce 自定义输入格式。

    Spark API Reference

    Source code

    【讨论】:

    • 您能详细解释一下关键和价值吗?唯一的输入是一个目录,我们可以从该目录中获取许多文件。那么文件名的关键是什么?文件名的价值是什么?
    • 谢谢@Vijay Innamuri 买路,你知道如何读取lz4压缩文件吗?我发现使用你的代码,我无法读取 lz4 压缩文件
    • textFileStream 与 fileStream ... def textFileStream(directory: String): DStream[String] = { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2. toString) }
    猜你喜欢
    • 2016-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-06
    相关资源
    最近更新 更多