【问题标题】:Empty folder in HDFS using Spark Streaming+Kafka使用 Spark Streaming+Kafka 的 HDFS 中的空文件夹
【发布时间】:2016-06-23 04:00:33
【问题描述】:

我正在使用 Spark Streaming + Kafka 将数据摄取到 HDFS。

val ssc = new StreamingContext(sparkContext, Seconds(30))
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
  .map(_._2)

每 30 秒,Kafka 队列中的所有数据将存储在 HDFS 中的单独文件夹中。一些文件夹包含一个名为 part-00000 的空文件,因为在相应的批处理间隔(30 秒)中没有数据。 我使用以下几行来过滤这些文件夹:

messageRecBased.filter { x => x.size == 0 }
messageRecBased.repartition(1).saveAsTextFiles("PATH")

但它不起作用,它仍然会生成带有空文件的文件夹。

【问题讨论】:

    标签: scala apache-spark hdfs spark-streaming


    【解决方案1】:

    如果您查看DStream.saveAsTextFiles() 方法定义,它只是将RDD.saveAsObjectFile 生成到DirectKafkaInputDStream 中的每个RDD

      def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
        val saveFunc = (rdd: RDD[T], time: Time) => {
          val file = rddToFileName(prefix, suffix, time)
          rdd.saveAsObjectFile(file)
        }
        this.foreachRDD(saveFunc)
      }
    

    因此,您可以选择自己写一些东西,而不是使用DStream.saveAsTextFiles()

    messageRecBased.foreachRDD{ rdd =>
        rdd.repartition(1)
        if(!rdd.isEmpty)
            rdd.saveAsObjectFile("FILE_PATH")
    }
    

    【讨论】:

    • 谢谢。它解决了使用空文件创建文件夹的问题。现在我有一个包含三个文件的文件夹:part-00000、part-00001、part00002。令人惊讶的是,这三个文件中的消息将被覆盖!所以我没有包含所有消息的文件,我有三个包含最后三个消息的文件!你知道为什么吗?
    • 我只有一个主题。
    • def main(args: Array[String]) { if (args.length 1) val zkQuorum = args(1) val group = "ZZZ" val sparkContext = new SparkContext("local[2]", "XXX") val ssc = new StreamingContext(sparkContext, Seconds(30)) val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic) .map(_._2) messageRecBased.foreachRDD{rdd => //rdd.repartition(1) if(!rdd.isEmpty()) rdd.saveAsTextFile("PATH") }
    • 您已评论重新分区。我猜你的主题中有三个内部 Kafka 分区,因此你有三个部分文件。如果您取消注释 RDD.repartitioning(1),那么您将获得一个零件文件。
    • Topic:test/PartitionCount:1/ReplicationFactor:1/Configs: Topic: test Partition: 0 Leader: 113 Replicas: 113 Isr: 113
    【解决方案2】:

    您可以检查分区是否为空,如果不是,则仅保存 RDD,如下所示。此代码应防止空 RDD 保存。

    messageRecBased.partitions.isEmpty
    

    【讨论】:

    • 你的意思是 messageRecBased.foreachRDD(x => if(!x.partitions.isEmpty) {x.saveAsTextFile(msgFileText)} ) ?因为“分区”不适用于 Dstream[String]
    • @Sunil; DStream 中没有 partitions() 方法。
    【解决方案3】:

    这就是我创建新目录并避免空批次的方法。

    import java.time.format.DateTimeFormatter
    import java.time.LocalDateTime
    
       messageRecBased.foreachRDD{ rdd =>
            rdd.repartition(1)
            val eachRdd = rdd.map(record => record.value)
            if(!eachRdd.isEmpty)
              eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
          }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-11-15
      • 2015-12-18
      • 2016-11-29
      • 2018-12-15
      • 1970-01-01
      • 2019-08-08
      • 1970-01-01
      • 2016-03-12
      相关资源
      最近更新 更多