【问题标题】:PySpark Delta Table - generate symlink [java.lang.NoSuchMethodError]PySpark Delta Table - 生成符号链接 [java.lang.NoSuchMethodError]
【发布时间】:2021-06-26 20:56:39
【问题描述】:

我有现在的情况:

  • Delta 表位于S3
  • 我想通过Athena查询这个表
  • spark 版本 3.1.1hadoop 3.2.0

为此,我需要遵循以下文档:instructionss3 setup

我正在使用MacBook ProEnvironment variables configured in my ~/.zshrc 作为我的小POC

export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1

我设置了一个小型 pyspark 项目,并在其中创建了我的 spark_session

from pyspark.sql import SparkSession
import findspark
import boto3


def create_session() -> SparkSession:
    findspark.init()

    spark_session = SparkSession.builder.appName("delta_session") \
        .master("local[*]") \
        .getOrCreate()

    sparkContext = spark_session.sparkContext

    boto_default_session = boto3.setup_default_session()

    boto_session = boto3.Session(
        botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
    )
    credentials = boto_session.get_credentials()

    print(
        f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
    )

    hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
    hadoopConfiguration.set(
        "fs.s3a.aws.credentials.provider", 
        "com.amazonaws.auth.profile.ProfileCredentialsProvider"
    )
    hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
    hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
    hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

    return spark_session

然后我运行:

spark_session = create_session()

from delta.tables import *

delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")

# This works
df = delta_table.toDF()
print(df.show(10))

# This fails
delta_table.generate("symlink_format_manifest")
  1. 我能够检索 delta 文件并创建一个 DataFrame,一切看起来都不错。

  2. 然后我尝试调用 delta_table.generate 并收到此错误:

Traceback(最近一次调用最后一次): 文件“/run.py”,第 33 行,在 delta_table.generate("symlink_format_manifest") 文件“/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a-0.io .jar/delta/tables.py”,第 74 行,在生成中 调用中的文件“/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py”,第 1305 行 文件“/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第 111 行,在 deco 文件“/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py”,第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o34.generate 时出错。 : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression$1(GenerateSymlinkManifest.scala:350) 在 scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) 在 scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) 在 scala.collection.immutable.List.flatMap(List.scala:355) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest$1(GenerateSymlinkManifest.scala:180) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) 在 org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) 在 org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27​​) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration$1(GenerateSymlinkManifest.scala:365) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) 在 com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) 在 org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1(DeltaGenerateCommand.scala:58) 在 org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1$adapted(DeltaGenerateCommand.scala:58) 在 org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) 在 io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) 在 io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) 在 io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) 在 io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748)

我调用应用程序:

    poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py

我尝试了什么:

  • 我试过在不使用poetry 的情况下运行它并直接下载 spark 并这样做
  • 我尝试使用较旧的hadoop 版本,因为他们似乎使用了那个here
  • 我找到了这个thread,但它没有帮助我
  • 我也试过io.delta:delta-core_2.12:0.8.0
  • 我已经验证delta版本0.7.00.8.0应该支持spark 3.1.1
  • 我还尝试添加pyarrow 并通过以下方式设置它:spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
  • 我也尝试过添加hadoop-common 3.2.0 --packages org.apache.hadoop:hadoop-common:3.2.0,但这也无济于事
  • 我也尝试使用spark 3.1.1 and hadoop 3.2.0 运行它,但我给了它--packages "io.delta:delta-core_2.12:0.7.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:2.7.7",但这给了我错误:
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"

在我看来 org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z 由于某种原因不可调用。而且我找不到更多要安装的东西?

我的pyproject.toml

[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]

[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"

[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"

非常适合任何可能遇到相同问题的人。


更新

根据 Alex 的评论,我通过以下方式解决了该问题:

  • Spark版3.0.2
  • Hadoop版本3.2.0
  • 三角洲0.8.0
  • spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ~/code/dataops-delta-infrastructure/run.py

【问题讨论】:

    标签: pyspark amazon-athena presto delta-lake python-poetry


    【解决方案1】:

    您需要将 Spark 降级到 Spark 3.0.2 才能使用 Delta 0.8.0 - 不幸的是,Spark 3.1.1 对内部使用的 Delta 进行了许多更改,这破坏了二进制兼容性。很可能,您的具体问题是由SPARK-32154 引起的,它更改了ScalaUDF 的参数(这个line

    【讨论】:

    • 谢谢你,亚历克斯,这拯救了我的一天!我不可能在文档中找到,因为他们说它应该在这里兼容:docs.delta.io/latest/releases.html
    • 是的,我自己也被这个不兼容所击中,正在等待与 3.1 兼容的版本
    • 你知道这是否支持3.1.1吗?
    • 据我了解很快就会,但不能准确地说。代码已经在git里了,大家可以自己编译
    猜你喜欢
    • 1970-01-01
    • 2021-05-12
    • 2014-10-21
    • 1970-01-01
    • 2022-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多