【问题标题】:Spark: Read and Write to Parquet leads to OutOfMemoryError: Java heap spaceSpark:读取和写入 Parquet 导致 OutOfMemoryError:Java 堆空间
【发布时间】:2015-09-10 10:07:41
【问题描述】:

我编写了一些代码来读取 parquet 文件,稍微切换架构并将数据写入新的 parquet 文件。代码如下:

...
val schema = StructType(
  List(
    StructField("id", LongType, false),
    StructField("data", ArrayType(FloatType), false)
  )
)

val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r =>  Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData,  schema)

Writer.writeToParquet(df)

Writer 存在

object Writer {
    def writeToParquet(df : DataFrame) : Unit = {
       val future = Future {
         df.write.mode(SaveMode.Append).save(path)
       }

       Await.ready(future, Duration.Inf)
    }
}

对于大约 4 GB 的文件,我的程序中断,引发 OutOfMemoryError: Java heap space。我为执行程序设置了 6 GB 内存(使用 -Dspark.executor.memory=6g),提升了 JVM 堆空间(使用 -Xmx6g),将 Kryo 序列化程序缓冲区增加到 2 GB(使用 System.setProperty("spark.kryoserializer.buffer.mb", "2048"))。但是,我仍然得到错误。

这是堆栈跟踪:

java.lang.OutOfMemoryError: Java heap space
  at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
  at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
  at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:744)

我可以做些什么来避免这个错误?

【问题讨论】:

  • 首先你没有在任何地方使用revisedData。其次,您从哪里获得 OOM?最后文件的结构是什么(有多少列)?
  • 对不起,我已经更正了代码中的错误。 OOM 在com.esotericsoftware.kryo.io.Output.&lt;init&gt;(Output.java:35) 引发(请参阅更新后的带有堆栈跟踪的帖子)。原始文件有两列,即IntSeq[Float]
  • 您使用的是哪个版本的 Spark?确保使用适当的 kryoserializer 缓冲区属性(1.4.1 没有mb 之一)。我会尝试减少缓冲区大小,2GB 太多了。不过,它需要足够大以容纳您的数据,因此请检查您的记录有多长,并尝试使用尽可能小的缓冲区。看起来那个火花版本正在使用 kryo-2.22,它基本上试图做buffer = new byte[bufferSize];,但没有那么多空间。
  • 是的,现在可以了!我在旧版本的 Spark 中使用了错误的缓冲区属性。切换了属性,现在运行流畅了。谢谢!
  • @naivge ok 然后将其添加为答案

标签: scala apache-spark apache-spark-sql


【解决方案1】:

根据我的评论,有两件事:

1) 您需要注意 spark.kryoserializer.buffer.mb 属性名称,在最新的 spark 中,他们将其更改为 spark.kryoserializer.bufferspark.kryoserializer.buffer.max

2) 您必须注意缓冲区的大小和堆大小,它必须足够大以存储您正在写入的单个记录,但不要太多,因为 kryo 正在创建一个显式的byte[]大小(并为 2GB 分配单个 byte 数组通常是个坏主意)。尝试使用适当的属性降低缓冲区大小。

【讨论】:

    【解决方案2】:

    使用 sparklyr, 具有相同的 OutOfMemoryError, 尽管减少了 spark.kryoserializer.buffer, 无法读取镶木地板我已经能够写入的文件, 我的解决方案是:

    禁用“eager”memory load option: (memory=FALSE)

    spark_read_parquet(sc,name=curName,file.path("file://",srcFile), header=true, memory=FALSE)
    

    火花 2.3.0 火花1.0.0 R 版本 3.4.2

    【讨论】:

      猜你喜欢
      • 2013-01-24
      • 2019-03-06
      • 2015-08-25
      • 1970-01-01
      • 2021-02-13
      • 2012-10-09
      • 2017-01-16
      • 2020-09-14
      • 1970-01-01
      相关资源
      最近更新 更多