【问题标题】:Spark save functionality use MapReduce under the hoodSpark 保存功能在后台使用 MapReduce
【发布时间】:2019-03-04 08:30:37
【问题描述】:

我试图弄清楚为什么saveAsText 和更多一般 Spark 保存功能似乎在后台使用 MapReduce。这是源代码:

RDD.scala

  def saveAsTextFile(path: String): Unit = withScope {
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

PairRDDFunctions.scala

所以基本上将给定的 RDD 转换为 PairRDD 以便调用saveAsHadoopFile 函数:

def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {

    val hadoopConf = conf
    hadoopConf.setOutputKeyClass(keyClass)
    hadoopConf.setOutputValueClass(valueClass)
    conf.setOutputFormat(outputFormatClass)
    for (c <- codec) {
      hadoopConf.setCompressMapOutput(true)
      hadoopConf.set("mapred.output.compress", "true")
      hadoopConf.setMapOutputCompressorClass(c)
      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
      hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
    }

    if (conf.getOutputCommitter == null) {
      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
    }

    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
    val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
      val warningMessage =
        s"$outputCommitterClass may be an output committer that writes data directly to " +
          "the final location. Because speculation is enabled, this output committer may " +
          "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
          "committer that does not have this behavior (e.g. FileOutputCommitter)."
      logWarning(warningMessage)
    }

    FileOutputFormat.setOutputPath(hadoopConf,
      SparkHadoopWriter.createPathFromString(path, hadoopConf))
    saveAsHadoopDataset(hadoopConf)
  }

就我这里的理解而言,它肯定是在尝试配置 MapReduce 作业,设置 outputKey、outputValue 等。

谁能给我解释一下:

  • Spark 保存操作是如何发生的
  • Spark 保存和 MapReduce 保存的主要区别是什么

【问题讨论】:

    标签: apache-spark mapreduce rdd


    【解决方案1】:

    它肯定会尝试配置 MapReduce 作业,设置 outputKey、outputValue 等。

    不完全是。它正在设置 Hadoop 配置,但这并不意味着它正在设置 MapReduce 作业。 Hadoop 本身包含多个不同的组件,其中大量组件并没有与 MapReduce 紧密绑定。其中许多,如 HDFS 接口或安全组件,用于许多不同的项目。

    Spark save 和 MapReduce save 的主要区别是什么

    这里没有。一般来说,Spark 与文件系统交互时,会使用相关的 Hadoop 组件。然而,这些独立于 MapReduce 组件,不应与 Hadoop MR 作业混淆。

    【讨论】:

      猜你喜欢
      • 2020-10-22
      • 2022-06-13
      • 1970-01-01
      • 2021-12-26
      • 2013-03-27
      • 1970-01-01
      • 1970-01-01
      • 2020-08-06
      • 1970-01-01
      相关资源
      最近更新 更多