【发布时间】:2021-07-01 08:44:49
【问题描述】:
在 AVRO 中将 RDD 保存到 S3 时,我在控制台中收到以下警告:
使用标准 FileOutputCommitter 提交工作。这很慢并且可能不安全。
我无法找到一个简单的隐式,例如saveAsAvroFile,因此我四处寻找并得出了这个结论:
import org.apache.avro.Schema
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
object AvroUtil {
def write[T](
path: String,
schema: Schema,
avroRdd: RDD[T],
job: Job = Job.getInstance()): Unit = {
val intermediateRdd = avroRdd.mapPartitions(
f = (iter: Iterator[T]) => iter.map(new AvroKey(_) -> NullWritable.get()),
preservesPartitioning = true
)
job.getConfiguration.set("avro.output.codec", "snappy")
job.getConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
AvroJob.setOutputKeySchema(job, schema)
intermediateRdd.saveAsNewAPIHadoopFile(
path,
classOf[AvroKey[T]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[T]],
job.getConfiguration
)
}
}
我很困惑,因为 AVRO 文件似乎输出正确,所以我看不出什么是不正确的。
【问题讨论】:
-
为什么不用 spark-avro 库写一个 Dataframe?
-
@OneCricketeer 你指的是这个吗? github.com/databricks/spark-avro 它似乎被标记为已弃用。我们的代码库依赖于低级 RDD。有机会你可以发布一个例子吗?谢谢。
-
是的。该库已在上游合并spark.apache.org/docs/latest/sql-data-sources-avro.html,您需要使用 toDF 函数 stackoverflow.com/questions/38968351/spark-2-0-scala-rdd-todf 转换您的 RDD
标签: scala apache-spark hadoop avro spark-avro