【发布时间】:2021-12-24 06:20:08
【问题描述】:
我正在关注雪花模式。我有一个包含 9 个维度表的事实表,用于红移仓库。我的 ETL 工作流程是用胶水完成的。我已将问题简化为两个表(一个维度表和一个事实)。
我遇到的问题是每次运行 ETL 时都会得到不同的映射结果,并认为这与我将 physical_attribute 表加入 animal 表的方式有关
我从名为 cat 的数据目录中的源表中提取数据。
cat_df = glueContext.create_dynamic_frame.from_catalog(database="animal_parquet", table_name="cat").toDF()
接下来,我设置了我的映射功能,我已经通过两种方式完成了。 这是在字典中映射的数据:
animal_color_map = {
"r": "red",
"w": "white",
"bl": "black",
"br": "brown",
}
-
F.create_map如果我能弄清楚其他问题,我会这样做。
animal_color_mapper = F.create_map(
[F.lit(x) for x in chain(*animal_color_map.items())])
F.udf
@F.udf
def animal_color_mapper2(x):
return animal_color_map.get(x, "NOT SPECIFIED")
现在可以执行数据框的创建、将维度表中的数据连接回事实表以及评估。
** 我认为问题出在类似于 pyspark UDF 的函数是非确定性的,但无法在文档中找到将它们转换为确定性函数的答案**
physical_attribute_df = (
cat_df.select(
F.coalesce(
animal_color_mapper[cat_df.fur_color],
F.lit("NOT SPECIFIED")
).cast('string').alias('color'), # color
F.coalesce(
animal_color_mapper2(cat_df.fur_color),
F.lit("NOT SPECIFIED")
).cast('string').alias('color2'), # color2
cat_df.zoo_id.alias('_zoo_id'),
).groupBy(
'color',
'color2',
).agg(
F.collect_list('_zoo_id').alias('_zoo_ids')
).coalesce(1).withColumn(
'id', F.monotonically_increasing_id() # id
).withColumn(
'created_date', F.current_timestamp() # create_date
).withColumn(
'last_updated', F.current_timestamp() # last_updated
)
)
此时的表格如下所示:
+------------+-------------+--------------------+---+--------------------+--------------------+
| color| color2| _app_ids| id| created_date| last_updated|
+------------+-------------+--------------------+---+--------------------+--------------------+
| black| black|[10447643, 104525...| 0|2021-11-11 18:38:...|2021-11-11 18:38:...|
| brown| brown|[10450650, 104551...| 1|2021-11-11 18:38:...|2021-11-11 18:38:...|
| white| white|[10445953, 104470...| 2|2021-11-11 18:38:...|2021-11-11 18:38:...|
| red| red|[10453690, 104547...| 3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+--------------------+---+--------------------+--------------------+
现在注意groupby 到agg,因为这是在我分解数据后用于将数据连接到事实表的。
_physical_attributes_df = physical_attribute_df.select(
F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
physical_attribute_df.id.alias('physical_attribute_id'),
)
此时的表格如下所示:
+--------+---------------------+
| zoo_id|physical_attribute_id|
+--------+---------------------+
|10447643| 0|
|10452584| 0|
|10453651| 0|
|10448127| 0|
|10444833| 0|
+--------+---------------------+
现在我有了这个查找表,我删除了_zoo_ids 列。
physical_attribute_df = physical_attribute_df.drop('_zoo_ids')
此时的表格如下所示:
+------------+-------------+---+--------------------+--------------------+
| color| color2| id| created_date| last_updated|
+------------+-------------+---+--------------------+--------------------+
| black| black| 0|2021-11-11 18:38:...|2021-11-11 18:38:...|
| brown| brown| 1|2021-11-11 18:38:...|2021-11-11 18:38:...|
| white| white| 2|2021-11-11 18:38:...|2021-11-11 18:38:...|
| red| red| 3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+---+--------------------+--------------------+
这是我的最后一步。将表加入事实表
animal_df = (
cat_df.join(
_physical_attributes_ids_df,
_physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
'left'
).select(
F.current_timestamp().alias('created_date'), # create_date
F.current_timestamp().alias('last_updated'), # last_updated
cat_df.zoo_id.alias('zoo_id'), # zoo_id
_physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id
).coalesce(1).withColumn(
'id', F.monotonically_increasing_id() # id
)
)
当我验证时,我使用映射执行的每次 ETL 运行的数据都不同。
【问题讨论】:
-
你能确认是
monotonically_increasing_id生成的列有不同的数据吗? -
是的,我认为你是对的。
monotonically_increasing_id生成的列具有不同的数据。不知道如何检查,但是在查看文档中的函数后是的,它是不确定的。有没有更好的方法来创建 ID?查找表?还是更好的订购方式? -
在
coalesce之后由color排序应该使生成的ID 具有确定性。对于animal_df,您可以通过physical_attributes_id订购以获得确定性。 -
谢谢。我的回答好听吗?我在您的帮助下创建了解决方案。只是不知道以后怎么给别人说最好。
标签: apache-spark pyspark etl aws-glue non-deterministic