【问题标题】:DataFrame Definintion is lazy evaluationDataFrame 定义是惰性求值
【发布时间】:2020-07-10 23:59:19
【问题描述】:

我是新来的火花和学习它。有人可以帮助解决以下问题

火花权威中关于数据帧定义的引用是“一般来说,Spark 只会在作业执行时失败,而不是在数据帧定义时失败——即使, 例如,我们指向一个不存在的文件。这是由于懒惰的评估,”

所以我猜spark.read.format().load() 是数据框定义。在这个创建的数据框之上,我们应用转换和操作,如果我没记错的话,加载是读取 API 而不是转换。

我试图在加载中“不存在的文件”,我认为这是数据框定义。但我得到了以下错误。根据这本书它不应该失败吧?我肯定错过了一些东西。有人可以帮忙吗?

df=spark.read.format('csv')
.option('header', 
'true').option('inferschema', 'true')
.load('/spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv')

错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 166, in load
    return self._df(self._jreader.load(path))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Path does not exist: /spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv;' 

为什么数据框定义在延迟评估时引用 Hadoop 元数据?

【问题讨论】:

    标签: dataframe apache-spark pyspark rdd lazy-evaluation


    【解决方案1】:

    直到这里数据框被定义并且读取器对象被实例化。

    scala> spark.read.format("csv").option("header",true).option("inferschema",true)
    res2: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@7aead157
    

    当你真正说加载时。

    res2.load('/spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv') 并且文件不存在...... 是执行时间。(这意味着它必须检查数据源,然后必须从 csv 加载数据)

    为了获取数据帧,它检查hadoop的元数据,因为它会检查hdfs这个文件是否存在。

    那你就没有了

    org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://203-249-241:8020/spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv
    

    一般

    1) RDD/DataFrame lineage 将被创建并且不会在定义时执行。 2) load 执行的时候就是执行时间。

    请参阅以下流程以更好地理解。

    结论:任何转换(以您的方式定义时间)将不会执行,直到调用操作(以您的方式执行时间)

    【讨论】:

      【解决方案2】:

      Spark 是一种懒惰的进化。但是,这并不意味着它在加载文件时无法验证文件是否存在。

      惰性进化发生在 DataFrame 对象上,为了创建 DataFrame 对象,他们需要首先检查文件是否存在。

      检查以下code

      @scala.annotation.varargs
        def load(paths: String*): DataFrame = {
          if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
            throw new AnalysisException("Hive data source can only be used with tables, you can not " +
              "read files of Hive data source directly.")
          }
      
          DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
            val catalogManager = sparkSession.sessionState.catalogManager
            val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
              source = provider, conf = sparkSession.sessionState.conf)
            val pathsOption = if (paths.isEmpty) {
              None
            } else {
              val objectMapper = new ObjectMapper()
              Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
            }
      

      【讨论】:

      • 谢谢,我明白你的意思了。但是你能告诉我这本教科书的引述在哪里适用吗? “一般来说,Spark 只会在作业执行时失败,而不是在 DataFrame 定义时失败——例如,即使我们指向一个不存在的文件。这是由于延迟评估,”
      • 检查元数据。因此,如果您尝试过滤不存在​​的列。它将为您提供新的数据框。您可以应用更多过滤器,加入此数据框,在您调用某些操作之前它不会失败。 (即收集、展示等)
      猜你喜欢
      • 1970-01-01
      • 2011-07-14
      • 1970-01-01
      • 2011-01-15
      • 1970-01-01
      • 2012-08-31
      • 1970-01-01
      • 2011-02-23
      • 2021-12-23
      相关资源
      最近更新 更多