【问题标题】:How to extract data from Oracle database with AWS Glue and other AWS services如何使用 AWS Glue 和其他 AWS 服务从 Oracle 数据库中提取数据
【发布时间】:2021-11-18 12:34:56
【问题描述】:

我是 AWS 胶水和其他 AWS 东西的新手。我需要为项目构建 ETL 框架。 这是高级图。我想了解,不是创建 400 个胶水管道,而是创建一个模板类型的东西,它由来自 postgres aurora/mysql 的参考数据驱动。我熟悉 Python。 有人对此有任何想法吗?任何参考,代码示例。

【问题讨论】:

  • 你的问题不清楚。什么模板?什么是参考数据?为什么需要 400 个胶水作业?
  • 模板是一种通用的粘合作业,可以使用元数据(源表和目标表引用数据)执行。我实际上不需要 400 个胶水作业。我想知道如何将这个粘合作业创建为所有 oracle 表的可重用作业。
  • 我们已经完成了类似的实现,其中我们在 Aurora RDS mysql 中使用了一个表来存储各种源数据的所有创建/选择/插入查询,并具有适当的状态。我们只有一个 SINGLE 粘合作业,根据 S3 事件触发器读取传入的文件名,并从 mysql 表中获取相应的查询和目标表名,对主表执行进一步的清理/插入/更新。现在正在生产中。
  • @Yuva 太好了,你有代码示例吗?
  • 当然,请检查我的答案,因为无法在 cmets 中提供。

标签: python amazon-web-services amazon-s3 aws-glue aws-glue-data-catalog


【解决方案1】:
  1. 我们的 mysql 数据库中有一个配置主表。每个方便的列我们使用 source_table_name 作为标识符来获取适当的表列名称/查询,以用于创建 STG TABLE、将数据加载到 STG 表、插入/更新到目标表等。
  2. 我们还在 config master 中将 INSERT/UPDATE 拆分为两个不同的列,因为我们使用 ON DUPLICATE KEY 来更新现有记录。
  3. 通过处理具有登陆文件名的 lambda 事件来获取源表名。
  4. 从 config master 获取源表名称所需的所有数据。如下所示:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb)
cur.execute(sql_query, (source_fname))
result = cur.fetchall()
for row in result:
stg_table_name = row[1]
tgt_table_name = row[2]
create_stg_table_qry = row[3]
load_data_stg_table_qry = row[4]
insert_tgt_table_qry = row[5]
insert_tgt_table_qry_part_1 = row[6]
insert_tgt_table_qry_part_2 = row[7]
conn.commit()
cur.close()

将适当的参数传递给通用函数,如下所示:

create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)

通用函数如下所示,这是aurora RDS的,请根据需要进行更改。

def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()

def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()

希望这能给你一个想法。

谢谢

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-23
    • 2019-04-17
    • 2017-08-16
    • 1970-01-01
    • 1970-01-01
    • 2019-04-25
    相关资源
    最近更新 更多