【问题标题】:How to partition data by datetime in AWS Glue?如何在 AWS Glue 中按日期时间对数据进行分区?
【发布时间】:2019-12-16 07:24:51
【问题描述】:

当前设置:

  • 带有 json 文件的 S3 位置。所有文件都存储在同一位置(无日/月/年结构)。

  • Glue Crawler 读取目录表中的数据

  • Glue ETL 作业将数据转换并存储到 s3 中的 parquet 表中
  • Glue Crawler 从 s3 parquet 表中读取数据并存储到 Athena 查询的新表中

我想要实现的是按天 (1) 分区的拼花表和 1 天的拼花表在同一个文件 (2) 中。目前每个 json 文件都有一个 parquet 表。

我该怎么做?

有一点要提一下,数据中有一个 datetime 列,但它是一个 unix 纪元时间戳。我可能需要将其转换为“年/月/日”格式,否则我假设它将再次为每个文件创建一个分区。

非常感谢您的帮助!!

【问题讨论】:

    标签: amazon-web-services etl aws-glue aws-glue-data-catalog


    【解决方案1】:

    将 Glue 的 DynamicFrame 转换为 Spark 的 DataFrame 以添加年/月/日列和重新分区。将分区减少到一个将确保只有一个文件将写入文件夹,但可能会降低作业性能。

    这里是python代码:

    from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
    
    ...
    
    df = dynamicFrameSrc.toDF()
    
    repartitioned_with_new_columns_df = df
        .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
        .withColumn(“year”, year(col(“date_col”)))
        .withColumn(“month”, month(col(“date_col”)))
        .withColumn(“day”, dayofmonth(col(“date_col”)))
        .drop(col(“date_col”))
        .repartition(1)
    
    dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
    
    datasink = glueContext.write_dynamic_frame.from_options(
        frame = dyf, 
        connection_type = "s3", 
        connection_options = {
            "path": "s3://yourbucket/data”, 
            "partitionKeys": [“year”, “month”, “day”]
        }, 
        format = “parquet”, 
        transformation_ctx = "datasink"
    )
    

    请注意,from pyspark.qsl.functions import col 可能会给出参考错误,这不应该是here 解释的问题。

    【讨论】:

    • 谢谢。我尝试了这种方法,玩弄了它,总是遇到某种语法/打字错误。我在 atm 上苦苦挣扎的是: TypeError: 'DynamicFrame' object is not subscriptable 知道为什么会这样吗?
    • @user2642287 请立即尝试,我已更新代码以使用col 函数
    • 再次感谢,按预期工作。我正在尝试将年、月、日转换为整数。 ApplyMapping 似乎没有做这项工作,有没有其他方法可以将分区列作为整数?
    • 函数year、month和dayofmonth的返回值是整数(见spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/…
    • @MaltMaster 每个分区只生成一个文件
    【解决方案2】:

    我无法发表评论,所以我将写作为答案。

    我使用了 Yuriy 的代码,还有一些需要调整的地方:

    • 缺少括号

    df = dynamicFrameSrc.toDF()

    • 在 toDF() 之后我必须添加 select("*") 否则架构为空

    df.select("*") .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))

    【讨论】:

      【解决方案3】:

      要在 AWS Glue Studio 中实现这一点:

      您需要创建一个自定义函数来将日期时间字段转换为日期。有一个额外的步骤是将其转换回 DynamicFrameCollection。

      在 Python 中:

      def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
          df = dfc.select(list(dfc.keys())[0]).toDF()
          df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
          glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
          return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))
      

      然后您必须编辑自定义转换器架构以包含您刚刚创建的新日期字段。

      然后您可以使用“数据目标”节点将数据写入磁盘,然后选择新的日期字段用作分区。

      video step by step walkthrough

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-09-07
        • 2012-02-16
        • 2023-01-24
        • 2018-03-21
        • 1970-01-01
        • 2012-07-07
        • 1970-01-01
        相关资源
        最近更新 更多