【问题标题】:Saving spark dataframe from azure databricks' notebook job to azure blob storage causes java.lang.NoSuchMethodError将 azure databricks 笔记本作业中的 spark 数据帧保存到 azure blob 存储会导致 java.lang.NoSuchMethodError
【发布时间】:2020-03-24 04:24:32
【问题描述】:

我在 azure databricks 中使用笔记本创建了一个简单的工作。 我正在尝试将 spark 数据框从笔记本保存到 azure blob 存储。 附上示例代码

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
# com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

# Tried with com.microsoft.azure:azure-storage:2.2.0

SECRET_ACCESS_KEY = "xxxxx"
STORAGE_NAME = "my_storage"
CONTAINER = "my_container"
SUB_PATH = "/azure_dbs_check/"
FILE_NAME = "result"

spark = SparkSession \
    .builder \
    .appName("azure_dbs_to_azure_blob") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark_context = spark.sparkContext
    fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"

    spark.conf.set("fs.wasbs.impl",
                   "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)

    file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME

    df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(traceback.format_exc())

当我在本地机器上运行 spark-submit 时,上面的代码有效。 使用的 spark 提交命令是

spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

可能的根本原因可能是

引起:java.lang.NoSuchMethodError:
com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

所以我将包降级为 com.microsoft.azure:azure-storage:2.2.0,其中包含 startCopyFromBlob 方法。
(在 com.microsoft.azure:azure-storage:3.x.x 版本中,删除了 CloudBlob 上的已弃用的 startCopyFromBlob()

即使在降级过程之后,错误仍然存​​在。

附加错误堆栈跟踪,

    Traceback (most recent call last):
      File "<command-4281470986294005>", line 28, in <module>
        df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
      File "/databricks/spark/python/pyspark/sql/readwriter.py", line 738, in save
        self._jwrite.save(path)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/databricks/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling o255.save.
    : org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:192)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:110)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:108)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:128)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:116)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 52, 10.2.3.12, executor 0): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2243)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
    Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

包含的 Spark 提交包:

  • org.apache.hadoop:hadoop-azure:2.7.2,
  • com.microsoft.azure:azure-storage:3.1.0(后来尝试使用 com.microsoft.azure:azure-storage:2.2.0

本地机器:
Python 3.6
Spark 版本 2.4.4 使用 Scala 版本 2.11.12

Databricks 详细信息:
集群信息:
5.5 LTS(包括 Apache Spark 2.4.3、Scala 2.11)
Python 3 (3.5)

Runtime 5.5 release notes 表示软件包 com.microsoft.azure azure-storage 5.2.0 已安装在环境中。

即使在作业中指定了另一个版本(2.2.0),问题是否是由于 spark 从环境中获取库(5.2.0 版本)? 在 5.2.0 等版本中,方法 startCopyFromBlob()removed

我已经记录了我在google doc 中尝试过的各种 jar 案例/组合

观察:

  1. Databricks 作业使用预安装的库 azure-storage:5.2.0。此包没有 com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob() 方法。 (在 4.x.x 版本中被 startCopy() 取代)。 azure-storage 固定为 5.2.0

  2. 所以我尝试使用最新的 hadoop-azure:3.2.1 来尝试获取不调用已弃用方法的 jar。但这导致了一个新错误
    java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities

  3. StreamCapabilities 类存在于 hadoop-common 包中。所以我包括了最新的 hadoop-common (3.2.1)。
    这导致 java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders()
    原因
    org.apache.hadoop:hadoop-common:2.7.3 预安装在 azure 运行时。这个 hadoop-common:2.7.3 没有 ProviderUtils.excludeIncompatibleCredentialProviders() 方法。

  4. 由于两个包(hadoop-common:2.7.3 & azure-storage:5.2.0)都是固定的(预安装),我尝试使用较低的 hadoop-azure 包来查找版本它不调用 excludeIncompatibleCredentialProviders() 方法。

  5. hadoop-azure:3.2.1(2019 年 11 月最新)hadoop-azure:2.8.0excludeIncompatibleCredentialProviders() 在里面被调用。
    低于 2.8.0,我开始遇到旧错误
    NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob。 startCopyFromBlob

【问题讨论】:

    标签: databricks azure-blob-storage azure-databricks


    【解决方案1】:

    另一种方法是创建一个挂载:

    https://docs.databricks.com/data/data-sources/azure/azure-storage.html

    然后根据需要调整保存路径。


    我也推荐使用这个

    spark.conf.set(
      "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
      "<storage-account-access-key>")
    

    代替

    spark_context._jsc.hadoopConfiguration().set(fs_acc_key, SECRET_ACCESS_KEY)
    

    因为您使用的是 DataFrame api 而不是 RDD api。


    编辑

    在 Databricks 社区集群中运行以下代码并修改 spark.conf.set 语句。

    import traceback
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StringType
    
    # Attached the spark submit command used
    # spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
    # com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py
    
    # Tried with com.microsoft.azure:azure-storage:2.2.0
    
    SECRET_ACCESS_KEY = "ACCESSKEY"
    STORAGE_NAME = "ACCOUNTNAME"
    CONTAINER = "CONTAINER"
    SUB_PATH = "/azure_dbs_check/"
    FILE_NAME = "result"
    
    spark = SparkSession \
        .builder \
        .appName("azure_dbs_to_azure_blob") \
        .getOrCreate()
    
    df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
    df.show()
    
    try:
        fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"
    
        spark.conf.set("spark.hadoop.fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        spark.conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)
    
        file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME
    
        print(file_path)
    
        df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
    
        print("Written successful")
    except Exception as exp:
        print("Exception occurred")
        print(traceback.format_exc())
    

    【讨论】:

    • 感谢您的推荐。将配置设置更新为 spark.conf.set() 。我正在尝试写入 azure 而不将其安装在 dbfs 上。
    • 好的。让我试着重现它,给我几分钟。
    • 它有效@Carlos。只是想知道哪个版本的库正在搜索密钥 spark.hadoop.fs.wasb.impl 而不是 fs.wasb.impl。当我看到5.5 release notes 中提到的source code for (hadoop-common:2.7.3) 时,我只能看到“fs.{}.impl”.format(schema)。第 2577 行
    猜你喜欢
    • 2019-03-08
    • 2020-07-07
    • 2020-11-15
    • 2020-12-10
    • 2021-07-11
    • 1970-01-01
    • 2021-03-19
    • 2019-08-06
    • 1970-01-01
    相关资源
    最近更新 更多