【发布时间】:2021-06-26 20:56:39
【问题描述】:
我有现在的情况:
- Delta 表位于
S3 - 我想通过
Athena查询这个表 -
spark版本3.1.1和hadoop3.2.0
为此,我需要遵循以下文档:instructions 和 s3 setup
我正在使用MacBook Pro 和Environment variables configured in my ~/.zshrc 作为我的小POC:
export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1
我设置了一个小型 pyspark 项目,并在其中创建了我的 spark_session:
from pyspark.sql import SparkSession
import findspark
import boto3
def create_session() -> SparkSession:
findspark.init()
spark_session = SparkSession.builder.appName("delta_session") \
.master("local[*]") \
.getOrCreate()
sparkContext = spark_session.sparkContext
boto_default_session = boto3.setup_default_session()
boto_session = boto3.Session(
botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
)
credentials = boto_session.get_credentials()
print(
f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
)
hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
hadoopConfiguration.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.profile.ProfileCredentialsProvider"
)
hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
return spark_session
然后我运行:
spark_session = create_session()
from delta.tables import *
delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")
# This works
df = delta_table.toDF()
print(df.show(10))
# This fails
delta_table.generate("symlink_format_manifest")
-
我能够检索 delta 文件并创建一个
DataFrame,一切看起来都不错。 -
然后我尝试调用
delta_table.generate并收到此错误:
Traceback(最近一次调用最后一次): 文件“/run.py”,第 33 行,在 delta_table.generate("symlink_format_manifest") 文件“/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a-0.io .jar/delta/tables.py”,第 74 行,在生成中 调用中的文件“
/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py”,第 1305 行 文件“ /site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第 111 行,在 deco 文件“ /site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py”,第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o34.generate 时出错。 : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression$1(GenerateSymlinkManifest.scala:350) 在 scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) 在 scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) 在 scala.collection.immutable.List.flatMap(List.scala:355) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest$1(GenerateSymlinkManifest.scala:180) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) 在 org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) 在 org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration$1(GenerateSymlinkManifest.scala:365) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) 在 com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) 在 org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) 在 org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) 在 org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1(DeltaGenerateCommand.scala:58) 在 org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1$adapted(DeltaGenerateCommand.scala:58) 在 org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) 在 io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) 在 io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) 在 io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) 在 io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748)
我调用应用程序:
poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py
我尝试了什么:
- 我试过在不使用
poetry的情况下运行它并直接下载 spark 并这样做 - 我尝试使用较旧的
hadoop版本,因为他们似乎使用了那个here - 我找到了这个thread,但它没有帮助我
- 我也试过
io.delta:delta-core_2.12:0.8.0 - 我已经验证
delta版本0.7.0和0.8.0应该支持spark 3.1.1 - 我还尝试添加
pyarrow并通过以下方式设置它:spark_session.conf.set("spark.sql.execution.arrow.enabled", "true") - 我也尝试过添加hadoop-common 3.2.0
--packages org.apache.hadoop:hadoop-common:3.2.0,但这也无济于事 - 我也尝试使用
spark 3.1.1 and hadoop 3.2.0运行它,但我给了它--packages "io.delta:delta-core_2.12:0.7.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:2.7.7",但这给了我错误:
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"
在我看来 org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z 由于某种原因不可调用。而且我找不到更多要安装的东西?
我的pyproject.toml
[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]
[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"
[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"
非常适合任何可能遇到相同问题的人。
更新
根据 Alex 的评论,我通过以下方式解决了该问题:
- Spark版
3.0.2 - Hadoop版本
3.2.0 - 三角洲
0.8.0 spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ~/code/dataops-delta-infrastructure/run.py
【问题讨论】:
标签: pyspark amazon-athena presto delta-lake python-poetry