【问题标题】:After AWS Glue is finished, how to execute a SQL script or stored procedure?AWS Glue 完成后,如何执行 SQL 脚本或存储过程?
【发布时间】:2019-04-01 18:22:42
【问题描述】:

我正在学习 AWS Glue。对于传统的 ETL,一个常见的模式是从目标表中查找主键来决定是否需要进行更新或插入(又名 upsert 设计模式)。使用胶水似乎没有相同的控制。简单地写出动态框架只是一个插入过程。我可以想到两种设计模式来解决这个问题:

  1. 将目标加载为数据框并在 spark 中,左外连接只插入新行(如果需要,您将如何更新行?删除然后插入???因为我是新来的 spark,这是最陌生的我)
  2. 将数据加载到暂存表中,然后使用 SQL 执行最终合并

这是我首先探索的第二种方法。 AWS Glue 作业完成后,我如何在 AWS 世界中执行 SQL 脚本或存储过程?你做 python-shell 工作,lambda,直接作为胶水的一部分,或者其他方式吗?

【问题讨论】:

  • 您在哪个数据库中写入数据(AWS Redshift、RDS)?
  • 本地,MS SQL Server。我有 JDBC 连接工作,我可以将我的数据框写入表。所以我知道我至少弄清楚了网络和连接部分。

标签: python amazon-web-services apache-spark pyspark aws-glue


【解决方案1】:

我使用 pymysql 库作为上传到 AWS S3 的 zip 文件,并在 AWS Glue 作业参数中进行了配置。对于 UPSERT,我使用了 INSERT INTO TABLE....ON DUPLICATE KEY。

因此,根据主键验证,代码要么更新已存在的记录,要么插入新记录。希望这可以帮助。请参考:

import pymysql

rds_host  = "rds.url.aaa.us-west-2.rds.amazonaws.com"
name = "username"
password = "userpwd"
db_name = "dbname"
conn = pymysql.connect(host=rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)

with conn.cursor() as cur:
   insertQry="INSERT INTO ZIP_TERR(zip_code, territory_code, "
             "territory_name,state) "
             "VALUES(zip_code, territory_code, territory_name, state) "
             "ON DUPLICATE KEY UPDATE territory_name = "
             "VALUES(territory_name), state = VALUES(state);"
   cur.execute(insertQry)
   conn.commit()
   cur.close()

在上面的代码示例中,区域代码、邮政编码是主键。也请参考这里:More on looping inserts using a for loops

【讨论】:

    【解决方案2】:

    与往常一样,AWS 不断变化的功能列表解决了大部分问题(源于用户需求和常见工作模式)。

    AWS 已在 Updating and Inserting new data 上发布了使用暂存表(您在第二个策略中提到的)的文档。

    一般来说,ETL 最严格的方法是截断并重新加载源数据,但这取决于您的源数据。如果您的源数据是跨越数十亿条记录的时间序列数据集,您可能需要使用增量/增量加载模式。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-23
      • 2021-02-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多