【问题标题】:Get the list of loaded files from Databricks Autoloader从 Databricks Autoloader 获取已加载文件的列表
【发布时间】:2022-01-11 14:39:20
【问题描述】:

我们可以使用Autoloader 来跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载的文件列表?

我可以在 AWS Glue 作业书签中轻松执行此操作,但我不知道如何在 Databricks Autoloader 中执行此操作。

【问题讨论】:

  • 我能否参考您正在寻找的 AWS Glue 作业书签功能。在 Glue 作业中添加了很多代码来做书签。你可以看到下面的自动加载器代码很简单,只有两个语句

标签: databricks databricks-autoloader


【解决方案1】:
.load("path")
.withColumn("filePath",input_file_name())

例如,您可以将 filePath 插入流接收器,然后从那里获取不同的值,或者使用 forEatch / forEatchBatch 并将其插入到 spark sql 表中

【讨论】:

    【解决方案2】:

    您可以使用结构化流获取加载到 S3 的文件的通知。对于已经加载的文件,s3_output_path可以检查目标路径。

        df = (spark.readStream.format('cloudFiles') \
        .option("cloudFiles.format",    "json") \
        .option("cloudFiles.region", "<aws region>) \
        .option("cloudFiles.awsAccessKey",<ACCESS_KEY>) \
        .option("cloudFiles.awsSecretKey", <SECRET_KEY>) \
       .option ("cloudFiles.useNotifications", "true") \
       .load(<s3_path>))
    
        df.writeStream.format('delta').outputMode("append") \
          .option("checkpointLocation", <checkpoint_path>) \
          .start(<s3_output_path>)
    

    【讨论】:

    • 只是处理文件,没有得到问题作者提出的已处理文件列表
    • 上述代码中的 option ("cloudFiles.useNotifications", "true") 将订阅加载的文件通知。
    • 我知道这意味着什么。问题是如何获取自动加载器已经加载的文件
    • 我在上面发表了评论以了解更多关于上述问题的信息,我看到自动加载器功能具有加载新文件的所有必要功能。如果我们从源存储读取后写入目标存储,则可以知道已经加载的文件。
    猜你喜欢
    • 1970-01-01
    • 2019-03-10
    • 1970-01-01
    • 2023-01-03
    • 2022-01-11
    • 1970-01-01
    • 2020-05-12
    • 1970-01-01
    • 2021-12-23
    相关资源
    最近更新 更多