【问题标题】:How do I run SQL SELECT on AWS Glue created Dataframe in Spark?如何在 AWS Glue 在 Spark 中创建的 Dataframe 上运行 SQL SELECT?
【发布时间】:2019-10-07 09:35:32
【问题描述】:

我在 AWS Glue 中有以下工作,它基本上从一个表中读取数据并将其提取为 S3 中的 csv 文件,但是我想在这个表上运行一个查询(A Select、SUM 和 GROUPBY)并想要得到输出到 CSV,如何在 AWS Glue 中执行此操作?我是 Spark 的新手,所以请帮忙

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx = 
"datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id", 
"int")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = 
applymapping1, connection_type = "s3", connection_options = {"path": 
"s3://bucket"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

【问题讨论】:

    标签: scala pyspark apache-spark-sql aws-glue


    【解决方案1】:

    粘合上下文的“create_dynamic_frame.from_catalog”函数创建一个动态帧而不是数据帧。并且动态框架不支持执行sql查询。

    要执行 sql 查询,您首先需要将动态帧转换为数据帧,在 spark 的内存中注册一个临时表,然后在此临时表上执行 sql 查询。

    示例代码:

    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from pyspark.sql import SQLContext
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    spark_session = glueContext.spark_session
    sqlContext = SQLContext(spark_session.sparkContext, spark_session)
    
    DyF = glueContext.create_dynamic_frame.from_catalog(database="{{database}}", table_name="{{table_name}}")
    df = DyF.toDF()
    df.registerTempTable('{{name}}')
    df = sqlContext.sql('{{your select query with table name that you used for temp table above}}')
    df.write.format('{{orc/parquet/whatever}}').partitionBy("{{columns}}").save('path to s3 location')
    

    【讨论】:

    • 如果这个问题的答案很明显,请原谅我,但我需要知道:所以我只需要在数据接收器中插入这个 df 而不是 applymapping,它应该在 s3 中生成 out csv?还是我必须再次从这个数据框创建一个动态框?
    • @RakeshGuha :我更新了示例代码。使用 sql 查询在 DF 上应用所有转换后,您可以使用 df.write 函数将数据写回 S3。您不需要将数据帧转换回动态帧。至于applymapping,它是一个动态框架特定的功能。您可以在应用映射后将动态框架转换为 DF,然后在 DF 上应用您的 sql 查询。
    • 非常感谢,我会试试这个,让你知道。再次感谢!
    • 第 32 行,在 中 df = sqlContext.sql('select QUERY') NameError: name 'sqlContext' is not defined End of LogType:stdout 又是一个基本问题,但这个 sqlContext 需要被定义?如果是这样,我们该怎么做?
    • sqlContext = SQLContext(spark_session.sparkContext, spark_session) NameError: name 'spark_session' is not defined - 由于某种原因仍然出现这些错误
    【解决方案2】:

    这就是我首先将胶水动态帧转换为火花数据帧的方法。然后使用glueContext对象和sql方法进行查询。

    spark_dataframe = glue_dynamic_frame.toDF()
    spark_dataframe.createOrReplaceTempView("spark_df")
    
    glueContext.sql("""
    SELECT * 
    FROM spark_df
    LIMIT 10
    """).show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-18
      • 1970-01-01
      • 1970-01-01
      • 2020-10-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多