【问题标题】:What FileOutputCommitter should be used in when writing AVRO files in Spark?在 Spark 中编写 AVRO 文件时应该使用什么 FileOutputCommitter?
【发布时间】:2021-07-01 08:44:49
【问题描述】:

在 AVRO 中将 RDD 保存到 S3 时,我在控制台中收到以下警告:

使用标准 FileOutputCommitter 提交工作。这很慢并且可能不安全。

我无法找到一个简单的隐式,例如saveAsAvroFile,因此我四处寻找并得出了这个结论:

import org.apache.avro.Schema
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD

object AvroUtil {

  def write[T](
      path: String,
      schema: Schema,
      avroRdd: RDD[T],
      job: Job = Job.getInstance()): Unit = {
    val intermediateRdd = avroRdd.mapPartitions(
      f = (iter: Iterator[T]) => iter.map(new AvroKey(_) -> NullWritable.get()),
      preservesPartitioning = true
    )

    job.getConfiguration.set("avro.output.codec", "snappy")
    job.getConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")

    AvroJob.setOutputKeySchema(job, schema)

    intermediateRdd.saveAsNewAPIHadoopFile(
      path,
      classOf[AvroKey[T]],
      classOf[NullWritable],
      classOf[AvroKeyOutputFormat[T]],
      job.getConfiguration
    )
  }
}

我很困惑,因为 AVRO 文件似乎输出正确,所以我看不出什么是不正确的。

【问题讨论】:

标签: scala apache-spark hadoop avro spark-avro


【解决方案1】:

您可以通过实现自己的 OutputFileCommitter 来覆盖现有 FileOutputCommitter 的行为,使其更加高效和安全。

关注link,作者已通过示例进行了类似的解释。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-25
    • 2014-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-29
    相关资源
    最近更新 更多