【问题标题】:cleanSource option does not delete any filescleanSource 选项不会删除任何文件
【发布时间】:2021-09-05 15:20:06
【问题描述】:

我有一个启用了 Trigger.Once() 的结构化流式传输作业,我每 20 分钟运行一次。每次运行后,我都会从 S3 中删除已处理的 parquet 文件,因此我启用了 cleanSource 删除选项,但它不起作用,我不知道为什么!

在展示我的代码之前,我必须评论一下他。我正在并行运行多个结构化流查询,我有 5 个存储桶,我并行提交。该作业完美运行,但不会删除任何已处理的文件。

var table = ['table1','table2','table3','table4','table5']
tables.par.map(table => {
        new ReplicationTables().run(table)
      })

object ReplicationTables {

def run(table): Unit = {
 val dataFrame = spark.readStream
      .option("mergeSchema", "true")
      .schema(dfSchema)
      .option("cleanSource","delete")  
      .parquet(s"s3a://my-bucket/${table}/*")

// I do some transformation and after I write my new dataframe called df to S3 in Delta format

df.writeStream
      .format("delta")
      .outputMode("append")
      .queryName(s"Delta/${table.schema}/${table.name}")
      .trigger(Trigger.Once())
     .option("checkpointLocation", s"s3a://my-bucket/checkpoints/${table.schema}/${table.name}")
      .start(s"s3a://my-bucket/Delta_Tables/${table}/")
      .awaitTermination()



}

}

PS:即使是 INFO 日志级别,我也没有任何关于 cleanSource

的日志

PS 2:关注 Structured Streaming 关于 cleanSource 的文档https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

【问题讨论】:

    标签: apache-spark spark-structured-streaming delta-lake


    【解决方案1】:

    尝试使用 option("spark.sql.streaming.fileSource.cleaner.numThreads", "10") 来加快清理速度。如果在更短的时间内生成更多文件,则 Spark 不会删除。可能增加线程有帮助

    【讨论】:

      猜你喜欢
      • 2021-06-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-06-07
      • 2010-12-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多