【问题标题】:How can I append to same file in HDFS(spark 2.11)如何附加到 HDFS 中的同一文件(火花 2.11)
【发布时间】:2018-12-03 20:54:43
【问题描述】:

我正在尝试使用 SparkStreaming 将流数据存储到 HDFS 中,但它会继续在 新文件 中创建并附加到单个文件或几个多个文件中

如果它继续创建n个文件,我觉得效率不会很高

HDFS 文件系统

代码

lines.foreachRDD(f => {
  if (!f.isEmpty()) {
    val df = f.toDF().coalesce(1)
    df.write.mode(SaveMode.Append).json("hdfs://localhost:9000/MT9")
  }
 })

在我的 pom 中,我使用了各自的依赖项:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11

【问题讨论】:

  • 如果你正在从 Kafka 读取数据到 HDFS,我建议你考虑使用 Nifi 或 Kafka Connect。不要为现有解决方案重写代码
  • hdfs 意味着一次写入多次读取,您无法写入同一个文件。为了做到这一点,您必须执行 hive 和 hbase 遵循的压缩类型的过程跨度>

标签: apache-spark apache-spark-sql spark-streaming


【解决方案1】:

正如您已经意识到,Spark 中的 Append 意味着写入现有目录而不是附加到文件。

这是有意和期望的行为(想想如果进程在“追加”过程中失败会发生什么,即使格式和文件系统允许这样做)。

如有必要,合并文件等操作应由单独的进程应用,以确保正确性和容错性。不幸的是,这需要一个完整的副本,出于明显的原因,这并不需要逐批进行。

【讨论】:

  • 你可以通过这个链接:spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/…追加模式是指在将DataFrame保存到数据源时,如果数据/表已经存在,则期望将DataFrame的内容追加到现有数据中.
  • @andani 在 Spark 中追加...对于 HDFS,追加意味着将新文件添加到目录中,而不是完全覆盖该目录
  • @cricket_007 然后是他们将数据存储在同一个文件中的任何方式,就像他们在 Storm 中一样
  • @andani 我从未使用过 Storm,但我知道它不用于持久数据存储
  • @cricket_007 我想说的是他们内置的库,可以按照需要的方式将数据存储在 hdfs 中。
【解决方案2】:

每次重新初始化 DataFrame 变量时,它都会为每个 rdd 创建文件。我建议有一个 DataFrame 变量,并在循环之外和与本地 DataFrame 的每个 rdd 联合内分配为 null。循环写入后使用外层DataFrame。

【讨论】:

  • 还是一样的情况
  • var empty = sqlContext.emptyDataFrame lines.foreachRDD(f => { if (!f.isEmpty()) { empty = f.toDF().coalesce(1) empty.write.mode( SaveMode.Append).json(warehouseLocation) } })
  • 在你的条件中添加这个 if(empty == null) empty = f.toDF() else empty = empty.union(f.toDF()) 在循环结束后 empty.coalesce(1) .write.mode 取决于您的选择。请不要写在循环内
  • df 没有 fiexd 编号。 coloum,所以你的情况会出错。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-12-26
  • 2015-05-15
  • 1970-01-01
  • 2017-07-05
  • 1970-01-01
  • 1970-01-01
  • 2019-08-13
相关资源
最近更新 更多