【问题标题】:Azure databricks data frame count generates error com.databricks.sql.io.FileReadException: Error while reading file abfss:REDACTED_LOCAL_PARTAzure databricks 数据帧计数生成错误 com.databricks.sql.io.FileReadException: Error while reading file abfss:REDACTED_LOCAL_PART
【发布时间】:2021-01-14 15:52:21
【问题描述】:

我在 Azure databricks 笔记本中有这个 pyspark 脚本:

    import argparse
    from pyspark.sql.types import StructType
    from pyspark.sql.types import StringType

    spark.conf.set(
    "fs.azure.account.key.gcdmchndev01c.dfs.core.chinacloudapi.cn",
    "<storag account key>"
    )
        

    inputfile = "abfss://raw@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code.csv"
    outputpath = "abfss://spark-processed@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code"
    jsonTableSchema = "{'type': 'struct', 'fields': [{'name': 'id', 'type': 'integer', 'nullable': False, 'metadata': {} }, {'name': 'hospital_ddi_code', 'type': 'string', 'nullable': True, 'metadata': {} } ]}"
    pipelineId = "test111111"
    rawpath = "raw/test/CODI_Ignored_Hospital_Code"
    rawfilename = "CODI_Ignored_Hospital_Code.csv"
    outputpath_bad = "abfss://spark-processed@gcdmchndev01c.dfs.core.chinacloudapi.cn/bad/test/CODI_Ignored_Hospital_Code"

    stSchema = StructType.fromJson(eval(jsonTableSchema))

    stSchema.add("ValidateErrorMessage", StringType(),True)

    raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE')

    raw.createOrReplaceTempView("tv_raw")
    dfClean = spark.sql("""
    select 
        *, nvl2(ValidateErrorMessage, 'failed', 'success') as ValidateStatus, now()+ INTERVAL 8 HOURS as ProcessDate, '""" + pipelineId + "'as ProcessId, '" + rawpath + "' as RawPath, '" + rawfilename + "' as RawFileName" + """
    from 
        tv_raw
    """)
    #dfClean.printSchema()

    dfBadRecord = dfClean.filter(dfClean["ValidateErrorMessage"].isNotNull())
    #dfBadRecord.cache()
    badCount = dfBadRecord.count()

这给了我最后一行的错误:

badCount = dfBadRecord.count()


    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 39, 10.139.64.6, executor 1): com.databricks.sql.io.FileReadException: Error while reading file abfss:REDACTED_LOCAL_PART@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code.csv

以及详细的错误:

    Py4JJavaError                             Traceback (most recent call last)
    <command-3450774645335260> in <module>
         34 dfBadRecord = dfClean.filter(dfClean["ValidateErrorMessage"].isNotNull())
         35 #dfBadRecord.cache()
    ---> 36 badCount = dfBadRecord.count()
         37 

    /databricks/spark/python/pyspark/sql/dataframe.py in count(self)
        584         2
        585         """
    --> 586         return int(self._jdf.count())
        587 
        588     @ignore_unicode_prefix

    /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
       1303         answer = self.gateway_client.send_command(command)
       1304         return_value = get_return_value(
    -> 1305             answer, self.gateway_client, self.target_id, self.name)
       1306 
       1307         for temp_arg in temp_args:

【问题讨论】:

    标签: databricks azure-databricks


    【解决方案1】:

    我联系了 MS Azure 支持,他们指出要在最后添加 .cache(),所以使用这个

        raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE').cache()
    

    代替:

        raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE')
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-10
      • 2017-06-29
      • 2021-10-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-17
      • 1970-01-01
      相关资源
      最近更新 更多