【问题标题】:not sure how to fix non-deterministic function issue with pyspark不确定如何使用 pyspark 解决非确定性函数问题
【发布时间】:2021-12-24 06:20:08
【问题描述】:

我正在关注雪花模式。我有一个包含 9 个维度表的事实表,用于红移仓库。我的 ETL 工作流程是用胶水完成的。我已将问题简化为两个表(一个维度表和一个事实)。

我遇到的问题是每次运行 ETL 时都会得到不同的映射结果,并认为这与我将 physical_attribute 表加入 animal 表的方式有关

我从名为 cat 的数据目录中的源表中提取数据。

cat_df = glueContext.create_dynamic_frame.from_catalog(database="animal_parquet", table_name="cat").toDF()

接下来,我设置了我的映射功能,我已经通过两种方式完成了。 这是在字典中映射的数据:

animal_color_map = {
    "r": "red",
    "w": "white",
    "bl": "black",
    "br": "brown",
}
  1. F.create_map 如果我能弄清楚其他问题,我会这样做。
animal_color_mapper = F.create_map(
    [F.lit(x) for x in chain(*animal_color_map.items())])
  1. F.udf
@F.udf
def animal_color_mapper2(x):
    return animal_color_map.get(x, "NOT SPECIFIED")

现在可以执行数据框的创建、将维度表中的数据连接回事实表以及评估。

** 我认为问题出在类似于 pyspark UDF 的函数是非确定性的,但无法在文档中找到将它们转换为确定性函数的答案**

  
physical_attribute_df = (
    cat_df.select(

        F.coalesce(
            animal_color_mapper[cat_df.fur_color],
            F.lit("NOT SPECIFIED")
        ).cast('string').alias('color'),  # color
        
        F.coalesce(
            animal_color_mapper2(cat_df.fur_color),
            F.lit("NOT SPECIFIED")
        ).cast('string').alias('color2'),  # color2
            
        cat_df.zoo_id.alias('_zoo_id'),

    ).groupBy(
        'color',
        'color2',
    ).agg(
        F.collect_list('_zoo_id').alias('_zoo_ids')
    ).coalesce(1).withColumn(
        'id', F.monotonically_increasing_id() # id
    ).withColumn(
        'created_date', F.current_timestamp() # create_date
    ).withColumn(
        'last_updated', F.current_timestamp() # last_updated
    )

)

此时的表格如下所示:

+------------+-------------+--------------------+---+--------------------+--------------------+
|       color|       color2|            _app_ids| id|        created_date|        last_updated|
+------------+-------------+--------------------+---+--------------------+--------------------+
|       black|        black|[10447643, 104525...|  0|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       brown|        brown|[10450650, 104551...|  1|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       white|        white|[10445953, 104470...|  2|2021-11-11 18:38:...|2021-11-11 18:38:...|
|         red|          red|[10453690, 104547...|  3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+--------------------+---+--------------------+--------------------+

现在注意groupbyagg,因为这是在我分解数据后用于将数据连接到事实表的。

_physical_attributes_df = physical_attribute_df.select(
    F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
    physical_attribute_df.id.alias('physical_attribute_id'),
)

此时的表格如下所示:

+--------+---------------------+
|  zoo_id|physical_attribute_id|
+--------+---------------------+
|10447643|                    0|
|10452584|                    0|
|10453651|                    0|
|10448127|                    0|
|10444833|                    0|
+--------+---------------------+

现在我有了这个查找表,我删除了_zoo_ids 列。

physical_attribute_df = physical_attribute_df.drop('_zoo_ids')

此时的表格如下所示:

+------------+-------------+---+--------------------+--------------------+
|       color|       color2| id|        created_date|        last_updated|
+------------+-------------+---+--------------------+--------------------+
|       black|        black|  0|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       brown|        brown|  1|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       white|        white|  2|2021-11-11 18:38:...|2021-11-11 18:38:...|
|         red|          red|  3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+---+--------------------+--------------------+

这是我的最后一步。将表加入事实表

animal_df = (

    cat_df.join(
        _physical_attributes_ids_df,
        _physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
        'left'
    ).select(

        F.current_timestamp().alias('created_date'), # create_date
        F.current_timestamp().alias('last_updated'), # last_updated

        cat_df.zoo_id.alias('zoo_id'),  # zoo_id
   
        _physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id
  
    ).coalesce(1).withColumn(
        'id', F.monotonically_increasing_id() # id
    )
)

当我验证时,我使用映射执行的每次 ETL 运行的数据都不同。

【问题讨论】:

  • 你能确认是monotonically_increasing_id生成的列有不同的数据吗?
  • 是的,我认为你是对的。 monotonically_increasing_id 生成的列具有不同的数据。不知道如何检查,但是在查看文档中的函数后是的,它是不确定的。有没有更好的方法来创建 ID?查找表?还是更好的订购方式?
  • coalesce 之后由color 排序应该使生成的ID 具有确定性。对于animal_df,您可以通过physical_attributes_id 订购以获得确定性。
  • 谢谢。我的回答好听吗?我在您的帮助下创建了解决方案。只是不知道以后怎么给别人说最好。

标签: apache-spark pyspark etl aws-glue non-deterministic


【解决方案1】:

问题是 在coalesce 之后的'id', F.monotonically_increasing_id() # id 是不确定的(谁会知道),因为coalesce 会导致没有顺序保证的分区合并。

在这种情况下,显式排序可以解决问题。

我用来解决问题的是我的表的身份字段在这种情况下是“颜色” F.row_number().over(Window.orderBy(physical_attribute_df.color))

physical_attribute_df = (
    cat_df.select(

        F.coalesce(
            animal_color_mapper[cat_df.fur_color],
            F.lit("NOT SPECIFIED")
        ).cast('string').alias('color'),  # color
       
        cat_df.zoo_id.alias('_zoo_id'),

    ).groupBy(
        'color',
        'color2',
    ).agg(
        F.collect_list('_zoo_id').alias('_zoo_ids')
    )

physical_attribute_df = (
    physical_attribute_df.coalesce(1).withColumn(
        'id',
        F.row_number().over(Window.orderBy(physical_attribute_df.color))
    ).withColumn(
        'created_date', F.current_timestamp() # create_date
    ).withColumn(
        'last_updated', F.current_timestamp() # last_updated
    )
)

_physical_attributes_df = physical_attribute_df.select(
    F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
    physical_attribute_df.id.alias('physical_attribute_id'),
)


physical_attribute_df = physical_attribute_df.drop('_zoo_ids')

animal_df = (

    cat_df.join(
        _physical_attributes_ids_df,
        _physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
        'left'
    ).select(

        F.current_timestamp().alias('created_date'), # create_date
        F.current_timestamp().alias('last_updated'), # last_updated

        cat_df.zoo_id.alias('zoo_id'),  # zoo_id
   
        _physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id

    )
)


animal_df = (

    animal_df.coalesce(1).withColumn(
        'id',
         F.row_number().over(Window.orderBy(animal_df.zoo_id))
    )
)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-12-13
    • 1970-01-01
    • 1970-01-01
    • 2021-11-10
    • 2016-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多