【发布时间】:2022-05-06 21:47:31
【问题描述】:
我想使用 Databricks Auto Loader 设置 S3 流。我设法设置了流,但我的 S3 存储桶包含不同类型的 JSON 文件。我想过滤掉它们,最好是在流本身中,而不是使用filter 操作。
根据the docs,我应该能够使用 glob 模式进行过滤。但是,我似乎无法让它工作,因为它无论如何都会加载所有内容。
这就是我所拥有的
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
.option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
.option("includeExistingFiles", "true")
.option("multiLine", "true")
.option("inferSchema", "true")
# .option("cloudFiles.schemaHints", schemaHints)
# .load("s3://<BUCKET>/qualifier/**/*_INPUT")
.load("s3://<BUCKET>/qualifier")
.withColumn("filePath", F.input_file_name())
.withColumn("date_ingested", F.current_timestamp())
)
我的文件有一个结构为qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json 的键,所以我想过滤包含名称输入的文件。
这似乎加载了所有内容:.load("s3://<BUCKET>/qualifier") 和 .load("s3://<BUCKET>/qualifier/**/*_INPUT") 是我想要做的,但这不起作用。 (我也试过.load("s3://<BUCKET>/qualifier/**/*_INPUT.json")
我的 glob 模式不正确,还是我缺少其他东西?
【问题讨论】:
标签: pyspark stream databricks glob databricks-autoloader