【问题标题】:How to run arbitrary / DDL SQL statements or stored procedures using AWS Glue如何使用 AWS Glue 运行任意/DDL SQL 语句或存储过程
【发布时间】:2021-02-22 19:05:48
【问题描述】:

是否可以从 AWS Glue python 作业执行任意 SQL 命令,例如 ALTER TABLE?我知道我可以使用它从表中读取数据,但是有没有办法执行其他数据库特定的命令?

我需要将数据摄取到目标数据库中,然后立即运行一些 ALTER 命令。

【问题讨论】:

    标签: pyspark aws-glue py4j


    【解决方案1】:

    因此,在进行了广泛的研究并在 AWS 支持下打开了一个案例后,他们告诉我目前无法通过 Python shell 或 Glue pyspark 工作。但我只是尝试了一些有创意的东西,它奏效了!这个想法是使用 sparks 已经依赖的 py4j 并利用标准的 java sql 包。

    这种方法的两大好处:

    1. 这样做的一个巨大好处是,您可以将数据库连接定义为 Glue 数据连接,并在其中保留 jdbc 详细信息和凭据,而无需在 Glue 代码中硬编码它们。下面的示例通过调用glueContext.extract_jdbc_conf('your_glue_data_connection_name') 来获取在 Glue 中定义的 jdbc url 和凭据。

    2. 如果您需要在支持的开箱即用 Glue 数据库上运行 SQL 命令,您甚至不需要为该数据库使用/传递 jdbc 驱动程序 - 只需确保为该数据库设置 Glue 连接并添加连接到您的 Glue 作业 - Glue 将上传正确的数据库驱动程序 jar。

    请记住,下面的代码由驱动程序进程执行,不能由 Spark 工作人员/执行程序执行。

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    logger = glueContext.get_logger()
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # dw-poc-dev spark test
    source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')
    
    from py4j.java_gateway import java_import
    java_import(sc._gateway.jvm,"java.sql.Connection")
    java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
    java_import(sc._gateway.jvm,"java.sql.DriverManager")
    java_import(sc._gateway.jvm,"java.sql.SQLException")
    
    conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
    
    print(conn.getMetaData().getDatabaseProductName())
    
    # call stored procedure, in this case I call sp_start_job
    cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
    cstmt.setString("job_name", "testjob");
    results = cstmt.execute();
    
    conn.close()
    

    【讨论】:

    • 这很棒。这正是我们所需要的——在使用 Glue 插入数据后执行存储过程。它的工作原理与描述的完全一样。离我只有两点。首先澄清“glue_database_connection_name”需要替换为您的实际连接名称。其次要调用特定的存储过程(不是作业),我必须在它前面加上数据库:DBName.dbo.my_stored_procedure。
    • @JamesFrank 谢谢,很高兴你发现它有用。我对代码进行了一些更改以使其更清晰
    • @mishkin - 谢谢你! “下面的这段代码由驱动程序进程执行,Spark 工作人员/执行程序不能执行”是什么意思?你能否解释一下如果作业在某个时候失败会发生什么——存储过程还会被执行吗?
    • @mishkin 我在同样的情况下。我使用了你的代码并运行了这项工作。给出连接错误。我还对连接进行了编码,但没有运气。我有其他针对该数据库运行的胶水 ETL 作业,所以我知道该数据库很好。我需要以不同的方式运行此代码吗?
    • @mishkin 知道为什么我可能会从您的代码中收到“调用 z:java.sql.DriverManager.getConnection 时出错。网址不能为空”吗?我的红移网址在..
    【解决方案2】:

    我终于在几个小时后完成了这项工作,因此希望以下内容会有所帮助。我的剧本受之前的回复影响很大,谢谢。

    先决条件:

    • 您需要在尝试任何脚本之前配置和测试 Glue 连接。
    • 设置 AWS Glue 作业时,请使用 Spark、Glue 2.0 或更高版本以及 Python 3。
    • 我建议只为 2 个工作线程配置此作业以节省成本;大部分工作将由数据库完成,而不是通过胶水完成。
    • 以下是使用 AWS RDS PostgreSQL 实例进行测试的,但希望足够灵活,可以用于其他数据库。
    • 脚本需要在脚本顶部附近更新 3 个参数(glue_connection_name、database_name 和 stored_proc)。
    • JOB_NAME、连接字符串和凭据由脚本检索,无需提供。
    • 如果您的存储过程将返回数据集,则将 executeUpdate 替换为 executeQuery。
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
        
    glue_connection_name = '[Name of your glue connection (not the job name)]'
    database_name = '[name of your postgreSQL database]'
    stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
        
    #Below this point no changes should be necessary.
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    glue_job_name = args['JOB_NAME']
        
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(glue_job_name, args)
    job.commit()
        
    logger = glueContext.get_logger()
        
    logger.info('Getting details for connection ' + glue_connection_name)
    source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
        
    from py4j.java_gateway import java_import
    java_import(sc._gateway.jvm,"java.sql.Connection")
    java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
    java_import(sc._gateway.jvm,"java.sql.DriverManager")
    java_import(sc._gateway.jvm,"java.sql.SQLException")
        
    conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
    logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
        
    stmt = conn.createStatement();
    rs = stmt.executeUpdate('call ' + stored_proc);
        
    logger.info("Finished")
    

    【讨论】:

      【解决方案3】:

      这取决于。如果您使用 redshift 作为目标,则可以选择将 pre 和 post 操作指定为连接选项的一部分。您将能够在那里指定更改操作。但是对于其余的目标类型,您可能需要使用一些 python 模块,例如 pg8000(在 Postgres 的情况下)和其他

      【讨论】:

      • 不幸的是,我的目标是 RDS SQL Server,它不支持前置和后置操作。我也试过boto3 rds-service(但它只支持Aurora),我不能使用pyodbc,因为Glue不支持非纯包..
      • 还有其他的 odbc 是纯 python
      • 只有少数被忽视的纯 python odbc 包,但它们需要你在 linux 机器上安装实际的 odbc 驱动程序,而你不能用 Glue 真正做到
      【解决方案4】:

      我修改了 mishkin 共享的代码,但它对我不起作用。因此,在进行了一些故障排除后,我意识到目录中的连接不起作用。所以我不得不手动修改它并稍微调整代码。现在它的工作但最终异常是因为它无法将java结果转换为python结果。我做了一个解决方法,所以请谨慎使用。

      below is my code. 
      
      
      import sys
      from awsglue.transforms import *
      from awsglue.utils import getResolvedOptions
      from pyspark.context import SparkContext
      from awsglue.context import GlueContext
      from awsglue.job import Job
      
      ## @params: [TempDir, JOB_NAME]
      args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
      
      sc = SparkContext()
      glueContext = GlueContext(sc)
      spark = glueContext.spark_session
      job = Job(glueContext)
      job.init(args['JOB_NAME'], args)
      
      #source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema')
      
      from py4j.java_gateway import java_import
      java_import(sc._gateway.jvm,"java.sql.Connection")
      java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
      java_import(sc._gateway.jvm,"java.sql.DriverManager")
      java_import(sc._gateway.jvm,"java.sql.SQLException")
      
      print('Trying to connect to DB')
      conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename', 'myusername', 'mypassword')
      
      print('Trying to connect to DB success!')
      
      print(conn.getMetaData().getDatabaseProductName())
      
      # call stored procedure, in this case I call sp_start_job
      stmt = conn.createStatement();
      #cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();");
      print('Call to proc trying ')
      
      #cstmt.setString("job_name", "testjob");
      
      try:
        rs = stmt.executeQuery('call mySchemaName.my_storedproc()');
      except:
        print("An exception occurred but proc has run")
        
      #results = cstmt.execute();`enter code here`
      conn.close()
      

      【讨论】:

        【解决方案5】:

        如果将连接对象附加到粘合作业,则可以轻松获取连接设置:

        glue_client = boto3.client('glue')
        getjob=glue_client.get_job(JobName=args["JOB_NAME"])
        connection_settings = glue_client.get_connection(Name=getjob['Job']['Connections']['Connections'][0])
        conn_name = connection_settings['Connection']['Name']
        df = glueContext.extract_jdbc_conf(conn_name)
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2016-05-14
          • 1970-01-01
          • 2022-01-22
          • 2019-04-02
          • 1970-01-01
          • 2018-03-16
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多