【发布时间】:2016-06-23 04:00:33
【问题描述】:
我正在使用 Spark Streaming + Kafka 将数据摄取到 HDFS。
val ssc = new StreamingContext(sparkContext, Seconds(30))
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
.map(_._2)
每 30 秒,Kafka 队列中的所有数据将存储在 HDFS 中的单独文件夹中。一些文件夹包含一个名为 part-00000 的空文件,因为在相应的批处理间隔(30 秒)中没有数据。 我使用以下几行来过滤这些文件夹:
messageRecBased.filter { x => x.size == 0 }
messageRecBased.repartition(1).saveAsTextFiles("PATH")
但它不起作用,它仍然会生成带有空文件的文件夹。
【问题讨论】:
标签: scala apache-spark hdfs spark-streaming