【发布时间】:2023-03-19 01:55:01
【问题描述】:
我将镶木地板数据保存在 aws s3 存储桶中。 parquet 文件按日期分区,文件夹结构如下所示
MyFolder
|-- date=20210701
|--part-xysdf-snappy.parquet
|-- date=20210702
|--part-fasdf-snappy.parquet
|-- date=20210703
|--part-ghdfg-snappy.parquet
....
....
请注意,Parquet in date=20210701(这是最早的条目)有问题,漏掉了两列
+-------+-----+
| name|grade|
+-------+-----+
|Alberto| 100|
| Dakota| 96|
+-------+-----+
parquet文件的其余部分都很好,就像
+-------+-----+------+-------+
| name|grade|height| date |
+-------+-----+--------------+
|Karolin| 110| 173 |20210701
| Lucas | 91| 178 |20210701
+-------+-----+------+-------+
如果我只想关注“name”和“grade”,可以使用以下代码显示结果
def check_data(start_date, end_date):
cols = ['name', 'grade']
df = spark.read.parquet('path/MyFolder').select(cols)
df = df.filter(f'date > "{start_date}" and date < "{end_date}"')
return df
上面的代码很方便,而且运行良好。但是,现在我想添加 'height' 和 'date' 列,并忽略 date=20210701(因为它错过了两列)。事情变得更诡异了。如果我使用
def check_data(start_date, end_date):
cols = ['name', 'grade', 'height', 'date']
nan = 'Nan'
df = spark.read.parquet('path/MyFolder').filter(f'height != "{nan}"')
df = df.filter(f'date > "{start_date}" and date < "{end_date}"')
df = df.select(cols)
return df
我收到了这个错误
Cannot resolve 'height' given input columns [name, grade].....
我在这里得到的唯一解决方案是遍历所有 parquet 文件夹,然后附加 pyspark 数据帧,但这需要额外的时间。
另外,如果我删除 date=20210701,问题也解决了,但我就是不能这样做。
你能分享一下你的想法吗?谢谢。 ????
【问题讨论】:
标签: pyspark filter apache-spark-sql schema