【问题标题】:Spark Extension using AWS Glue使用 AWS Glue 的 Spark 扩展
【发布时间】:2022-10-20 15:42:02
【问题描述】:

我在本地创建了一个使用 spark 扩展的脚本'uk.co.gresearch.spark:spark-extension_2.12:2.2.0-3.3'用于以简单的方式比较不同的 DataFrame。

但是,当我在 AWS Glue 上尝试此操作时,我遇到了一些问题并收到此错误:ModuleNotFoundError:没有名为“gresearch”的模块

我尝试从本地磁盘复制 .jar 文件,当我在本地初始化 spark 会话并收到以下消息时引用了该文件:

... 存储在以下位置的包的 jar:/Users/["SOME_NAME"]/.ivy2/jars uk.co.gresearch.spark#spark-extension_2.12 添加为依赖项...

在该路径中,我找到了一个名为:uk.co.gresearch.spark_spark-extension_2.12-2.2.0-3.3.jar我复制到 S3 并在 Jar lib 路径中引用。

但这没有用......您将如何以正确的方式进行设置?

我用来在 AWS Glue 上测试的示例代码如下所示:

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

appName = 'test_gresearch'
spark_conf = SparkConf()
spark_conf.setAll([('spark.jars.packages', 'uk.co.gresearch.spark:spark- 
extension_2.12:2.2.0-3.3')])
spark=SparkSession.builder.config(conf=spark_conf)\
.enableHiveSupport().appName(appName).getOrCreate()

from gresearch.spark.diff import *

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

df1.show()
df2.show()

options = DiffOptions().with_change_column('changes')
df1.diff_with_options(df2, options, 'id').show()

任何提示都非常受欢迎。先感谢您!

问候

【问题讨论】:

    标签: apache-spark pyspark aws-glue


    【解决方案1】:

    在与 AWS 支持团队进行一些调查后,我被指示通过 Python 库路径包含包 .jar 文件,因为 .jar 文件包含嵌入式 Python 包。因此,应下载正确版本的 .jar 文件(https://mvnrepository.com/artifact/uk.co.gresearch.spark/spark-extension_2.12/2.1.0-3.1 是我最终使用的版本)并上传到 S3 并在 Python 库路径的 Glue 作业设置下引用(例如 - s3://bucket-名称/spark-extension_2.12-2.1.0-3.1.jar)。

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()   
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    job.commit()
    
    left = spark.createDataFrame([(1, "one"), (2, "two"), (3, "three")], ["id", "value"])
    right = spark.createDataFrame([(1, "one"), (2, "Two"), (4, "four")], ["id", "value"])
    
    from gresearch.spark.diff import *
    
    left.diff(right, "id").show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-12-19
      • 2020-02-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-15
      • 1970-01-01
      相关资源
      最近更新 更多