【问题标题】:Read /Write delta lake tables on S3 using AWS Glue jobs使用 AWS Glue 作业在 S3 上读取/写入 delta Lake 表
【发布时间】:2020-11-05 07:24:26
【问题描述】:

我正在尝试使用 AWS 胶水作业访问 S3 上的 Delta 湖表,但出现“未定义模块 Delta”的错误

 from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession.builder.appName("MyApp").config("spark.jars.packages", "io.delta:delta-core_2.11:0.6.0").getOrCreate()
from delta.tables import *

data = spark.range(0, 5)
data.write.format("delta").save("S3://databricksblaze/data")

在胶水作业的依赖 jar 中也添加了必要的 Jar ( delta-core_2.11-0.6.0.jar )。 谁可以帮我这个事 谢谢

【问题讨论】:

    标签: apache-spark aws-glue delta-lake


    【解决方案1】:

    我在使用 Glue + Deltalake 方面取得了成功。我将 Deltalake 依赖项添加到 Glue 作业的“依赖 jars 路径”部分。 这里有它们的列表(我使用的是 Deltalake 0.6.1):

    • com.ibm.icu_icu4j-58.2.jar
    • io.delta_delta-core_2.11-0.6.1.jar
    • org.abego.treelayout_org.abego.treelayout.core-1.0.3.jar
    • org.antlr_antlr4-4.7.jar
    • org.antlr_antlr4-runtime-4.7.jar
    • org.antlr_antlr-runtime-3.5.2.jar
    • org.antlr_ST4-4.0.8.jar
    • org.glassfish_javax.json-1.0.4.jar

    然后在您的 Glue 作业中,您可以使用以下代码:

    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    sc = SparkContext()
    sc.addPyFile("io.delta_delta-core_2.11-0.6.1.jar")
    
    from delta.tables import *
    
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    delta_path = "s3a://your_bucket/folder"
    data = spark.range(0, 5)
    data.write.format("delta").mode("overwrite").save(delta_path)
    
    deltaTable = DeltaTable.forPath(spark, delta_path)
    

    【讨论】:

    • 嗨,你从哪里得到所有这些罐子,我下载的没有 org.或 io。在文件名中。你能分享下载链接吗? ---- 出现错误 -----> 文件“/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第 328 行,get_return_value 格式( target_id, ".", name), value) py4j.protocol.Py4JJavaError: 调用 o55.addFile 时出错。 :java.io.FileNotFoundException:文件文件:/tmp/io.delta_delta-core_2.11-0.6.1.jar 在 org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:640) 中不存在跨度>
    • 您好,您可以下载本地模式下执行spark的jar,请使用以下版本的Delta执行spark:pyspark --packages io.delta:delta-core_2.11:0.6.1然后转到ivy2位置,在那里您会找到jar:@987654324 @
    • AWS Glue 中 Spark 的最新版本是 2.4,所以这就是我们应该使用低于 0.7 的 delta-core 版本的原因。这是兼容性表:docs.delta.io/0.8.0/releases.html
    【解决方案2】:

    你需要传递额外的配置属性

    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
    

    【讨论】:

    • 你的意思是火花配置代码看起来像 spark = SparkSession.builder.appName("MyApp").config("spark.jars.packages", "io.delta:delta- core_2.11:0.6.0","spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension","spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog")。 getOrCreate()
    • 不,实际上你必须将这些配置作为参数传递给胶水作业,你还必须传递这个参数'spark.delta.logStore.class','org.apache.spark.sql.delta.storage.S3SingleDriverLogStore'
    • 试过这个-----> conf = pyspark.SparkConf() conf.set("spark.jars.packages", "io.delta:delta-core_2.11:0.6.0 ") conf.set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") conf.set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog .DeltaCatalog") conf.set("spark.delta.logStore.class","org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") spark = SparkSession.builder.appName("MyApp").config(conf =conf).getOrCreate()------还是报同样的错误
    【解决方案3】:

    SparkSession.builder.config 中设置spark.jars.packages 不起作用。 spark.jars.packagesorg.apache.spark.deploy.SparkSubmitArguments/SparkSubmit 处理。所以它必须作为spark-submitpyspark 脚​​本的参数传递。当SparkSession.builder.config 被调用时,SparkSubmit 已经完成了它的工作。所以spark.jars.packages 目前是无操作的。详情请见https://issues.apache.org/jira/browse/SPARK-21752

    【讨论】:

    • 嗨@zsxwing,你的意思是我必须从胶水作业参数中传递参数并在脚本中使用它,你能否分享一个例子,如果可以的话,不知道该怎么做它。
    • 你可以使用pyspark --packages io.delta:delta-core_2.12:0.7.0 ...spark-submit --packages io.delta:delta-core_2.12:0.7.0 ...
    • 尝试了以下更改,但仍然出现相同的错误 ----------> import pyspark import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta -core_2.11:0.6.0 pyspark-shell' conf = pyspark.SparkConf() conf.set("spark.jars.packages", "io.delta:delta-core_2.11:0.6.0") spark = SparkSession .builder.appName("MyApp").config(conf=conf).getOrCreate()
    猜你喜欢
    • 1970-01-01
    • 2021-05-01
    • 1970-01-01
    • 2021-12-22
    • 2022-07-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多