【问题标题】:Getting data from HDFS using Sparkstreaming使用 Spark Streaming 从 HDFS 获取数据
【发布时间】:2016-10-04 06:31:24
【问题描述】:

我正在尝试使用火花流从 HDFS 读取数据。 下面是我的代码。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.fs._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(10))

val directory ="hdfs://pc-XXXX:9000/hdfs/watchdirectory/"
val lines=ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t:Path) => true, true).map(_._2.toString)
lines.count()
lines.print()
ssc.start
ssc.awaitTermination()

代码运行,但没有从 HDFS 读取任何数据。 每 10 秒后我得到一个空行。

我已经浏览了 fileStream 的文档,并且我知道我已将文件移动到监视目录。 但这对我不起作用。 我也尝试过 textFileStream 但没有运气。

我正在使用使用 Scala 2.11.8 构建的 spark 2.0.0

请有任何建议。

【问题讨论】:

  • 您是如何尝试使用 textFileStream 的,请发布您的代码
  • 没有大的区别,但这是我更改的代码行。 val lines = ssc.textFileStream("hdfs://pc-XXXX:9000/hdfs/watchdirectory/")
  • 运行spark程序后使用textFileStream移动文件,如果文件已经存在则不会选择。

标签: apache-spark spark-streaming


【解决方案1】:

请在下面试试

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(10))

val lines= ssc.textFileStream("hdfs://pc-XXXX:9000/hdfs/watchdirectory/").map(_._2.toString)
lines.count()
lines.print()
ssc.start
ssc.awaitTermination()

执行此操作后,将文件移动到

/hdfs/watchdirectory/

【讨论】:

  • 在发布问题之前我已经知道了,但它对我不起作用
  • 你跑得怎么样?哪种模式?本地、YARN、介子、独立?
  • 尝试使用本地和独立
猜你喜欢
  • 2015-12-18
  • 2018-04-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-25
  • 2018-12-15
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多