【发布时间】:2021-09-05 15:20:06
【问题描述】:
我有一个启用了 Trigger.Once() 的结构化流式传输作业,我每 20 分钟运行一次。每次运行后,我都会从 S3 中删除已处理的 parquet 文件,因此我启用了 cleanSource 删除选项,但它不起作用,我不知道为什么!
在展示我的代码之前,我必须评论一下他。我正在并行运行多个结构化流查询,我有 5 个存储桶,我并行提交。该作业完美运行,但不会删除任何已处理的文件。
var table = ['table1','table2','table3','table4','table5']
tables.par.map(table => {
new ReplicationTables().run(table)
})
object ReplicationTables {
def run(table): Unit = {
val dataFrame = spark.readStream
.option("mergeSchema", "true")
.schema(dfSchema)
.option("cleanSource","delete")
.parquet(s"s3a://my-bucket/${table}/*")
// I do some transformation and after I write my new dataframe called df to S3 in Delta format
df.writeStream
.format("delta")
.outputMode("append")
.queryName(s"Delta/${table.schema}/${table.name}")
.trigger(Trigger.Once())
.option("checkpointLocation", s"s3a://my-bucket/checkpoints/${table.schema}/${table.name}")
.start(s"s3a://my-bucket/Delta_Tables/${table}/")
.awaitTermination()
}
}
PS:即使是 INFO 日志级别,我也没有任何关于 cleanSource
的日志PS 2:关注 Structured Streaming 关于 cleanSource 的文档https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources
【问题讨论】:
标签: apache-spark spark-structured-streaming delta-lake