【发布时间】:2015-09-10 10:07:41
【问题描述】:
我编写了一些代码来读取 parquet 文件,稍微切换架构并将数据写入新的 parquet 文件。代码如下:
...
val schema = StructType(
List(
StructField("id", LongType, false),
StructField("data", ArrayType(FloatType), false)
)
)
val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r => Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData, schema)
Writer.writeToParquet(df)
Writer 存在
object Writer {
def writeToParquet(df : DataFrame) : Unit = {
val future = Future {
df.write.mode(SaveMode.Append).save(path)
}
Await.ready(future, Duration.Inf)
}
}
对于大约 4 GB 的文件,我的程序中断,引发 OutOfMemoryError: Java heap space。我为执行程序设置了 6 GB 内存(使用 -Dspark.executor.memory=6g),提升了 JVM 堆空间(使用 -Xmx6g),将 Kryo 序列化程序缓冲区增加到 2 GB(使用 System.setProperty("spark.kryoserializer.buffer.mb", "2048"))。但是,我仍然得到错误。
这是堆栈跟踪:
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
我可以做些什么来避免这个错误?
【问题讨论】:
-
首先你没有在任何地方使用
revisedData。其次,您从哪里获得 OOM?最后文件的结构是什么(有多少列)? -
对不起,我已经更正了代码中的错误。 OOM 在
com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)引发(请参阅更新后的带有堆栈跟踪的帖子)。原始文件有两列,即Int和Seq[Float]列 -
您使用的是哪个版本的 Spark?确保使用适当的 kryoserializer 缓冲区属性(1.4.1 没有
mb之一)。我会尝试减少缓冲区大小,2GB 太多了。不过,它需要足够大以容纳您的数据,因此请检查您的记录有多长,并尝试使用尽可能小的缓冲区。看起来那个火花版本正在使用 kryo-2.22,它基本上试图做buffer = new byte[bufferSize];,但没有那么多空间。 -
是的,现在可以了!我在旧版本的 Spark 中使用了错误的缓冲区属性。切换了属性,现在运行流畅了。谢谢!
-
@naivge ok 然后将其添加为答案
标签: scala apache-spark apache-spark-sql