【问题标题】:Cannot write Lake Formation governed table data from Glue ETL Job无法从 Glue ETL 作业写入 Lake Formation 管理的表数据
【发布时间】:2022-07-16 16:38:26
【问题描述】:

我正在使用 Lake Formation 构建一个 POC,我在其中读取火车运动信息队列,并使用 AWS 数据管理器将各个事件保存到受管理的表中。这很好用。

然后,我尝试使用 AWS Glue ETL 作业读取这个受控表,并将结果数据写入另一个受控表。这成功了,并将镶木地板文件写入该表下的 S3 存储桶/文件夹,但是当我尝试查询数据时,它无法从 Athena 读取(Athena 查询不返回任何记录)

我使用此 Aws Wrangler 语句创建了旅程表:

aw.catalog.create_parquet_table(database = "train_silver", 
                            table = "journey", 
                            path = "s3://train-silver/journey/",
                            columns_types = {
                                'train_id': 'string',
                                'date': 'date',
                                'stanox': 'string',
                                'start_timestamp': 'timestamp',
                                'created': 'timestamp',
                                'canx_timestamp': 'bigint'
                            },
                            compression = "snappy",
                            partitions_types = {'segment_date': 'date'},
                            table_type = "GOVERNED")

这是胶水作业的代码:

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger.info('About to start transaction')

tx_id = glueContext.start_transaction(False)

bronze_table = glueContext.create_dynamic_frame.from_catalog(database = "train_bronze", table_name = "train_movements_governed", 
    additional_options = { "transactionId": tx_id })
logger.info('About to save the bronze table to a view')
bronze_table.toDF().registerTempTable("train_movements")

max_journey_timestamp = 0

journey_df = spark.sql("""
    SELECT train_id, loc_stanox as stanox, CAST(canx_timestamp as bigint) AS canx_timestamp, segment_date
    FROM train_movements
    WHERE canx_type = 'AT ORIGIN'
    AND cast(canx_timestamp AS bigint) > {}""".format(max_journey_timestamp))

journey_df = journey_df.withColumn("created",current_timestamp())

def date_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0).date()
date_UDF = udf(lambda z: date_from_timestamp(z))

def date_time_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0)
date_time_UDF = udf(lambda z: date_from_timestamp(z))

journey_df = journey_df.withColumn("date", date_UDF(col("canx_timestamp")))
journey_df = journey_df.withColumn("start_timestamp", date_time_UDF(col("canx_timestamp")))
journey_df.printSchema()

try:
    save_journey_frame = DynamicFrame.fromDF(journey_df, glueContext, "journey_df")
    logger.info('Saving ' + str(save_journey_frame.count()) + 'new journeys')
    journeySink = glueContext.write_dynamic_frame.from_catalog(frame = save_journey_frame, database = "train_silver", table_name = "journey", 
        additional_options = { "callDeleteObjectsOnCancel": True, "transactionId": tx_id })
    logger.info('Committing transaction')
    glueContext.commit_transaction(tx_id)
    logger.info('Transaction committed')
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
logger.info('Committing the job')
job.commit()

运行 Glue 作业时,表文件夹中有 parquet 文件,但它们没有组织在我的表定义定义的分区文件夹中:

我还尝试编写一个胶水作业来读取该文件夹中的镶木地板文件,它们包含它们应该包含的所有行。

这是我尝试在 Athena 中查询数据的屏幕截图:

我在这里缺少什么,如何从 Spark 粘合作业中获取添加到受管表的数据,以便我可以从 Athena 查询它?

【问题讨论】:

    标签: pyspark aws-glue aws-glue-data-catalog aws-lake-formation


    【解决方案1】:

    我认为问题在于桌子上的对象没有被更新。

    您可以使用此 AWS CLI 命令检查:

    aws lakeformation get-table-objects --database-name train_silver --table-name journey
    

    来自Format Options for ETL Inputs and Outputs in AWS Glue documentation

    对于编写 Apache Parquet,AWS Glue ETL 仅支持写入 通过为自定义 Parquet 编写器指定选项来管理表 为动态帧优化的类型。写入受管表时 使用镶木地板格式,您应该添加密钥 useGlueParquetWriter 表参数中的值为 true。

    您也可以在创建表格时将表格的分类参数设置为“glueparquet”(您也可以对其进行更新):

    aw.catalog.create_parquet_table(database = "train_silver", 
                            table = "journey", 
                            path = "s3://train-silver/journey/",
                            columns_types = {
                                'train_id': 'string',
                                'date': 'date',
                                'stanox': 'string',
                                'start_timestamp': 'timestamp',
                                'created': 'timestamp',
                                'canx_timestamp': 'bigint'
                            },
                            compression = "snappy",
                            parameters={
                                "classification": "glueparquet"
                            }
                            partitions_types = {'segment_date': 'date'},
                            table_type = "GOVERNED")
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-05-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-29
      • 1970-01-01
      • 1970-01-01
      • 2018-01-30
      • 1970-01-01
      相关资源
      最近更新 更多