【发布时间】:2018-10-31 19:05:08
【问题描述】:
我使用 Spark 从 Kafka 主题流式传输数据。这是我尝试过的代码。在这里,我只是在控制台中显示流数据。我想将此数据作为文本文件存储在 HDFS 中。
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
object StreamingDataNew {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map(
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "kafka-streaming-example",
"zookeeper.connection.timeout.ms" -> "200000"
)
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaConf,
Map("topic-one" -> 1), // subscripe to topic and partition 1
StorageLevel.MEMORY_ONLY
)
println("printing" + lines.toString())
val words = lines.flatMap { case (x, y) => y.split(" ") }
words.print()
ssc.start()
ssc.awaitTermination()
}
}
我发现我们可以使用“saveAsTextFiles”编写 DStream。但是有人可以清楚地提到如何使用上述 scala 代码连接 Hortonworks 并存储在 HDFS 中的步骤。
【问题讨论】:
-
@SivaprasannaSethuraman 谢谢。
-
@SivaprasannaSethuraman 我已经在您分享的上述问题链接中尝试了可能的解决方案。这对我来说并不奏效。
-
简单地说“这对我有用”对我们没有多大帮助。你能说说你遇到了什么问题吗?
-
@Sivaprasanna Sethuraman 我尝试打开文件时可以看到这些消息。但问题是,我正在为每个分区获取新文件。
标签: apache-spark apache-kafka hdfs spark-streaming hortonworks-sandbox