【问题标题】:Troubleshooting Scala Spark Configurations/EnvironmentsScala Spark 配置/环境故障排除
【发布时间】:2015-09-20 05:45:10
【问题描述】:

运行 windows 8.1、Java 1.8、Scala 2.10.5、Spark 1.4.1、Scala IDE (Eclipse 4.4)、Ipython 3.0.0 和 Jupyter Scala

我对 Scala 和 Spark 还比较陌生,我发现某些 RDD 命令(例如 collect 并首先返回“Task not serializable”)错误。对我来说不寻常的是,我在带有 Scala 内核或 Scala IDE 的 Ipython 笔记本中看到了该错误。但是,当我直接在 spark-shell 中运行代码时,我没有收到此错误。

我想设置这两个环境,以便在 shell 之外进行更高级的代码评估。我在解决此类问题和确定要查找的内容方面缺乏专业知识;如果您能提供有关如何着手解决此类问题的指导,将不胜感激。

代码:

val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data"
val sample = sc.parallelize(sc.textFile(logFile).take(100).map(line => line.replace("'","").replace("\"","")).map(line => line.substring(0,line.length()-1)))
val header = sample.first
val data = sample.filter(_!= header)
data.take(1)
data.count
data.collect

堆栈跟踪

org.apache.spark.SparkException: Task not serializable
    org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
    org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    org.apache.spark.rdd.RDD.filter(RDD.scala:310)
    cmd49$$user$$anonfun$4.apply(Main.scala:188)
    cmd49$$user$$anonfun$4.apply(Main.scala:187)
java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@5976e363)
    - field (class: cmd12$$user, name: conf, type: class org.apache.spark.SparkConf)
    - object (class cmd12$$user, cmd12$$user@39a7edac)
    - field (class: cmd49, name: $ref$cmd12, type: class cmd12$$user)
    - object (class cmd49, cmd49@3c2a0c4f)
    - field (class: cmd49$$user, name: $outer, type: class cmd49)
    - object (class cmd49$$user, cmd49$$user@774ea026)
    - field (class: cmd49$$user$$anonfun$4, name: $outer, type: class cmd49$$user)
    - object (class cmd49$$user$$anonfun$4, <function0>)
    - field (class: cmd49$$user$$anonfun$4$$anonfun$apply$3, name: $outer, type: class cmd49$$user$$anonfun$4)
    - object (class cmd49$$user$$anonfun$4$$anonfun$apply$3, <function1>)
    org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    org.apache.spark.rdd.RDD.filter(RDD.scala:310)
    cmd49$$user$$anonfun$4.apply(Main.scala:188)
    cmd49$$user$$anonfun$4.apply(Main.scala:187)

【问题讨论】:

  • 你为什么在 sc.parallelize 中使用 sc.textFile?!?

标签: java eclipse scala apache-spark


【解决方案1】:

@Ashalynd 关于 sc.textFile 已经创建和 RDD 的事实是正确的。在这种情况下,您不需要 sc.parallelize。 documentation here

所以考虑到你的例子,这就是你需要做的:

// Read your data from S3
val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data"
val rawRDD = sc.textFile(logFile)

// Fetch the header
val header =  rawRDD.first

// Filter on the header than map to clean the line
val sample = rawRDD.filter(!_.contains(header)).map { 
 line => line.replaceAll("['\"]","").substring(0,line.length()-1)
}.takeSample(false,100,12L) // takeSample returns a fixed-size sampled subset of this RDD in an array

最好使用takeSample函数:

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

withReplacement : 是否使用替换进行采样

num : 返回样本的大小

种子:随机数生成器的种子

注意1:样本是一个Array[String],所以如果你想把它转换成RDD,你可以使用parallelize函数如下:

val sampleRDD = sc.parallelize(sample.toSeq)

注意 2: 如果您希望直接从您的 rawRDD.filter(...).map(...) 获取示例 RDD,您可以使用返回 RDD[T] 的 sample 函数。不过,您需要指定所需数据的一小部分,而不是具体数字。

【讨论】:

  • val header = sample.first 不起作用,因为 sample 那时还不存在?
  • 我的错。那应该是 rawRDD.first
  • 但是您的标头与由样本制成的标头不同。 Sample 清除数据,将每个数据元素的长度减 1,然后提取标题并针对该标题过滤数据。获取不等于未处理的第一个元素的原始数据可能不会产生相同的集合。
  • 所有数据的标头都必须是唯一的,无论它是全采样的。它通常定义参数的名称。所以我不同意这一点!
【解决方案2】:

sc.textFile 已经创建了分布式数据集(查看文档)。在这种情况下你不需要 sc.parallelize,但是 - 正如 eliasah 正确指出的那样 - 如果你想要一个 RDD,你需要再次将结果转换为 RDD。

val selection = sc.textFile(logFile). // RDD
take(100). // collection
map(_.replaceAll("['\"]",""). // use regex to match both chars
map(_.init) // a method that returns all elements except the last
// turn the resulting collection into RDD again
val sample = sc.parallelize(selection)

【讨论】:

  • sample 不是这里的 RDD 思想!
  • 为什么? textFile 应该创建一个 RDD 不是吗?
  • 是的,然后 take(100) 返回一个您正在映射的 Array[String]。因此,您将拥有一个已应用 replaceAll 和 init 函数的字符串数组。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-07-15
  • 2010-11-30
  • 2021-09-11
相关资源
最近更新 更多