【问题标题】:Transforming one row into many rows using Amazon Glue使用 Amazon Glue 将一行转换为多行
【发布时间】:2018-04-04 13:08:34
【问题描述】:

我正在尝试使用 Amazon Glue 将一行变成多行。我的目标类似于 SQL UNPIVOT。

我有一个 360GB 的管道分隔文本文件,压缩 (gzip)。它有超过 1,620 列。这是基本布局:

primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male|1|is_college_educated|1

这些属性名称/值字段超过 800 个。大约有 2.8 亿行。该文件位于 S3 存储桶中。我需要将数据导入 Redshift,但 Redshift 中的列限制为 1,600。

用户希望我取消透视数据。例如:

primary_key|key|value
12345|is_male|1
12345|is_college_educated|1

我相信我可以为此使用 Amazon Glue。但是,这是我第一次使用 Glue。我正在努力寻找一个好的方法来做到这一点。一些 pySpark 扩展转换看起来很有希望(可能是“映射”或“关系化”)。见http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-etl-scripts-pyspark-transforms.html。 所以,我的问题是:在 Glue 中执行此操作的好方法是什么?

谢谢。

【问题讨论】:

    标签: apache-spark pyspark bigdata aws-glue


    【解决方案1】:

    AWS Glue 没有适当的内置 GlueTransform 子类来将单个 DynamicRecord 转换为多个(通常 MapReduce 映射器可以这样做)。您也不能自己创建这样的转换。

    但是有两种方法可以解决您的问题。

    选项 1:使用 Spark RDD API

    让我们尝试执行您需要的操作:将单个记录映射到多个记录。由于GlueTransform 的限制,我们将不得不深入研究并使用 Spark RDD API。

    RDD 有特殊的flatMap 方法,允许生成多个Row,然后将其展平。您的示例代码如下所示:

    source_data = somehow_get_the_data_into_glue_dynamic_frame()
    source_data_rdd = source_data.toDF().rdd
    unpivoted_data_rdd = source_data_rdd.flatMap(
        lambda row: (
            (
                row.id,
                getattr(row, f'{field}_name'),
                getattr(row, f'{field}_value'),
            )
            for field in properties_names
        ),
    )
    unpivoted_data = glue_ctx.create_dynamic_frame \
        .from_rdd(unpivoted_data_rdd, name='unpivoted')
    

    选项 2:映射 + 关系化 + 连接

    如果您只想使用 AWS Glue ETL API 执行请求的操作,那么这里是我的说明:

    1. 第一个 map 每个 DynamicRecord 从源到主键和对象列表:
    mapped = Map.apply(
        source_data,
        lambda record:  # here we operate on DynamicRecords not RDD Rows
            DynamicRecord(
                primary_key=record.primary_key,
                fields=[
                    dict(
                        key=getattr(row, f'{field}_name'),
                        value=getattr(row, f'{field}_value'),
                    )
                    for field in properties_names
                ],
            )
    )
    

    示例输入:

    primary_key|property1_name|property1_value|property800_name|property800_value
          12345|is_male       |              1|is_new          |                1
          67890|is_male       |              0|is_new          |                0
    

    输出:

    primary_key|fields
          12345|[{'key': 'is_male', 'value': 1}, {'key': 'is_new', 'value': 1}]
          67890|[{'key': 'is_male', 'value': 0}, {'key': 'is_new', 'value': 0}]
    
    1. 接下来relationalize 它:每个列表都将转换为多行,每个嵌套对象都将被取消嵌套(Scala Glue ETL API docs 有很好的示例和比 Python 文档更详细的解释)。
    relationalized_dfc = Relationalize.apply(
        mapped,
        staging_path='s3://tmp-bucket/tmp-dir/',  # choose any dir for temp files
    )
    

    该方法返回DynamicFrameCollection。在单个数组字段的情况下,它将包含两个DynamicFrame:第一个是primary_key,外键是扁平化和非嵌套fields动态帧。 输出:

    # table name: roottable
    primary_key|fields
          12345|     1
          67890|     2
    
    # table name: roottable.fields
    id|index|val.key|val.value
     1|    0|is_male|        1
     1|    1|is_new |        1
     2|    0|is_male|        0
     2|    1|is_new |        0
    
    1. 合乎逻辑的最后一步是加入这两个DynamicFrame 的:
    joined = Join.apply(
        frame1=relationalized_dfc['roottable'],
        keys1=['fields'],
        frame2=relationalized_dfc['roottable.fields'],
        keys2=['id'],
    )
    

    输出:

    primary_key|fields|id|index|val.key|val.value
          12345|     1| 1|    0|is_male|        1
          12345|     1| 1|    1|is_new |        1
          67890|     2| 2|    0|is_male|        0
          67890|     2| 2|    1|is_new |        0
    

    现在您只需 renameselect 所需的字段。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-09
      • 2020-05-21
      相关资源
      最近更新 更多