【发布时间】:2020-04-28 09:52:06
【问题描述】:
在 Scala 中,我正在使用 .saveTableAs 将我的 DataFrame 写入 S3,但 Glue 似乎没有在数据库位置、格式等方面正确更新自身。对于背景,传入的数据集为 1.5TB JSON,目标数据格式为 Parquet;尽管重命名过程很慢,但所有 Parquet 文件都会写入。
val writeMode = "Overwrite"
val destinationFormatType = "parquet"
val s3PathBase = "s3://foo_bucket"
val currentDatabase = "bar"
val replaceTable = true
val jsonColumn = "json"
val partitionBy = Array("year", "month", "day", "hour")
val currentEvent = "fooBar"
val tableLowerCase = glueCatalog.fixTableName(currentEvent.asInstanceOf[String])
val s3Path = s"${s3PathBase}/${tableLowerCase}"
val tablePathInDb = s"${currentDatabase}.${tableLowerCase}"
println(tablePathInDb)
val currentEventDf = spark.read.json(
dfWithJson
.filter(col("event") === lit(currentEvent))
.select(jsonColumn)
.as[String]
)
// Adds partitions to have input data retain the same paths as the output data, since this is Kinesis
val dfToWrite = s3Reader.addKinesisPartitionsToDataFrameRows(currentEventDf, inputBasePath)
val dfWriter = dfToWrite
.repartition(partitionBy.map(col): _*)
.write
.option("mode", "DROPMALFORMED")
.mode(writeMode)
.format(destinationFormatType)
.option(
"path",
s3Path
)
if (replaceTable) {
println("\t- .saveAsTable")
dfWriter
.partitionBy(partitionBy: _*)
.saveAsTable(tablePathInDb)
} else {
println("\t- .insertInto")
dfWriter.insertInto(tablePathInDb)
}
数据写入时,显示正常,可通过 Spark 在 S3 中读取,但 Glue 不正确地注册 Hive 表:
名称 foobar
说明
数据库栏
分类未知
位置 s3://foo_bucket/hive-metastore/bar.db/foobar-PLACEHOLDER
连接
不推荐使用
最后更新时间为 2020 年 1 月 9 日星期四 16:55:23 GMT-800
输入格式 org.apache.hadoop.mapred.SequenceFileInputFormat
输出格式 org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Serde 序列化库 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Serde 参数
模式 DROPMALFORMED
路径 s3://foo_bucket/foobar
序列化格式1
【问题讨论】:
-
您遇到的错误是什么?
-
这是一个序列文件,这是无效的。我会发布解决方法。
标签: scala apache-spark hive aws-glue amazon-athena