【问题标题】:Spark Scala code performance Tuning memory overHead ErrorSpark Scala 代码性能调整内存开销错误
【发布时间】:2019-06-26 16:40:48
【问题描述】:

您好,我在 scala spark 中编写了以下代码,在运行 640 万条记录时出现此错误

线程“主”java.lang.OutOfMemoryError 中的异常:超出 GC 开销限制

甚至像这样的警告 警告 WindowExec:没有为 Window 操作定义分区!将所有数据移动到单个分区,这可能会导致性能严重下降。

任何人都可以帮我解决这个问题,这对你来说真的很有帮助。我尝试提及执行程序内存,但仍然相同。

   trait init5 {
  var metaData: String = null
  var ColumnsList = new ListBuffer[String]()
  var fileList = new ListBuffer[String]()
  var dateList = new ListBuffer[String]()
  var fileL = new ListBuffer[String]()
  var ymlFilePath: String = ""
  var csvFilePath: String = ""
  var delimiter: String = ""
var count_range_column: String = ""
var usage_column: String = ""
var date_column: String = ""
var millioncount_column: String = ""
var zero_morethanzero_column: String = ""
var serviceOrders_column: String = ""
var empty_null_column: String = ""
var simple_columns: String = ""
 }

 object yml2 extends init5 {


    def main(args: Array[String]) {




  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hbase 
  sql").set("SPARK_EXECUTOR_MEMORY", "6g").set("spark.network.timeout","900s").set("spark.task.max.failure","24").set("spark.memory.offheap.size","31113699737")
   val sc = new SparkContext(sparkConf)
   val spark1 = SparkSession.builder().config(sc.getConf).getOrCreate()
    val sqlContext = spark1.sqlContext

    val propertypath: String = "C:/Users/ayushgup/Desktop/properties.yml"



      getProperty(propertypath)


     import spark1.implicits._

        def f1(number: Double) = {
  "%.2f".format(number).toDouble
}
val udfFunc = udf(f1 _)
    import scala.util.control.Exception._

def getCountPercent(df: DataFrame): DataFrame = {
  allCatch opt df
  df.withColumn("SUM", sum("count").over())
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100)
    .withColumn("number", udfFunc(col("Percent")))
    .drop("Percent")
    .drop("fraction")
}

 def occurenceCount(df: DataFrame,column: String)    
{

 var usageFinalDF = df.groupBy(column).count.transform(getCountPercent)           

   for (value <- usageFinalDF.collect()){
         fileList += column + "~" + value.mkString("~")
  }
}


Logger.getLogger("org").setLevel(Level.WARN)



         val headerCSV  = spark1.sqlContext.read.format("CSV").option("header","true").option("delimiter", """|""").load("C:\\Users\\ayushgup\\Downloads\\Header3.csv")

  val columns = headerCSV.columns


  val data =   spark1.sqlContext.read.format("CSV").option("delimiter", """~""").load("C:\\Users\\ayushgup\\Desktop\\GDF_postpaid_customer_PROD\\postpaid_customers_20190131_3474270.csv").toDF(columns:_*)


for (coll <- columns.toList) {


  if (date_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    for (datesss <- data.select(coll)) {
      dateList += datesss.toString().slice(1, 8)

    }

    var dateFinalDF = dateList.toList.toDF(coll)

    occurenceCount(dateFinalDF,coll)

  } else if (count_range_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0))
       .toDF(coll)

    occurenceCount(r,coll)

  } else if (usage_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1026, "<=1gb").when(col(coll) > 1026 && col(coll) < 5130, "1-5gb")
      .when(col(coll) > 5130 && col(coll) < 10260, "5-10gb")
      .when(col(coll) > 10260 && col(coll) < 20520, "10-20gb")
      .when(col(coll) > 20520, ">20gb")
      .otherwise(0)).toDF(coll)

      occurenceCount(r,coll)

  } else if (millioncount_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    var r = data.select(coll).withColumn(coll, when(col(coll) > 1000000 && col(coll) < 5000000, "1-5m").when(col(coll) > 5000000 && col(coll) < 11000000, "5-11m")
      .when(col(coll) > 12000000 && col(coll) < 23000000, "12-23m")
      .when(col(coll) > 24000000 && col(coll) < 35000000, "24-35m")
      .when(col(coll) > 36000000, ">36m")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)

  } else if (zero_morethanzero_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) > 0, ">0")
      .otherwise(1)).toDF(coll)

   occurenceCount(r,coll)

  } else if (serviceOrders_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) === 1, "1")
      .when(col(coll) === 2, "2")
      .when(col(coll) === 3, "3")
      .when(col(coll) > 3, ">3"))
      .toDF(coll)

   occurenceCount(r,coll)
  } else if (empty_null_column.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

    var condition = data.select(coll).withColumn(coll, when(col(coll) === 0, "empty, n/a").otherwise(1)).toDF(coll)

   occurenceCount(condition,coll)
  } else if (simple_columns.toLowerCase().trim().split(",").contains(coll.toLowerCase())) {

   val actData1 = data.select(coll).toDF(coll)

    occurenceCount(actData1,coll)

  } 
}


var f = fileList.toList
for (flist <- f) {

  fileL += flist.replaceAll("[\\[\\]]", "")

}
//Creating the Final List into a DataFrame

     Logger.getLogger("org").info("Creating the Final List into a DataFrame")
     var ff = fileL.toDF().selectExpr("split(value, '~')[0] as Attribute_Name", "split(value, '~')[1] as Attribute_Value","split(value, '~')[2] as Attribute_Count","split(value, '~')[3] as Attribute_Total_Count","split(value, '~')[4] as Attribute_Percentage");


   df1.show(50000) 

  spark1.stop()

 }



   //Get metaData from application.yml file

    def getProperty(propertyFilePath: String) {
val source = Source.fromFile(propertyFilePath)

for (line <- source.getLines()) {

  if (line.contains("app_schema_path")) {

    ymlFilePath = line.split(":").last.mkString.trim()

  } else if (line.contains("data_path")) {
    csvFilePath = line.split(":").last.mkString.trim()
  } else if (line.contains("delimiter")) {
    delimiter = line.split(":").last.mkString.trim()
  } else if (line.contains("count_range_column")) {
    count_range_column = line.split(":").last.mkString.trim()
  } else if (line.contains("usage_column")) {
    usage_column = line.split(":").last.mkString.trim()
  } else if (line.contains("date_column")) {
    date_column = line.split(":").last.mkString.trim()
  } else if (line.contains("millioncount_column")) {
    millioncount_column = line.split(":").last.mkString.trim()
  } else if (line.contains("zero_morethanzero_column")) {
    zero_morethanzero_column = line.split(":").last.mkString.trim()
  } else if (line.contains("serviceOrders_column")) {
    serviceOrders_column = line.split(":").last.mkString.trim()
  } else if (line.contains("empty_null_column")) {
    empty_null_column = line.split(":").last.mkString.trim()
  } else if (line.contains("simple_columns")) {
    simple_columns = line.split(":").last.mkString.trim()
  } 
}

source.close()

  }

     }

【问题讨论】:

  • 正如警告消息中明确指出的那样,在定义window 规范时,您没有指定窗口将被分区的列。但是,在您的代码中,我没有找到任何窗口操作。你在哪里定义它? & 您在哪一行收到警告/错误?
  • 嗨 vdep 我在环境中运行此代码,我收到错误“容器因超过内存限制 1.8gb 的 1.5gb 使用的物理内存而被 YARN 杀死。考虑提升 spark.yarn.executor.memoryOverhead 和在本地运行时,我收到了上面发布的错误,如果我需要在我的代码中添加分区,你能帮忙吗?如何改进这对你来说真的很有帮助。我被困在这部分请帮助stackoverflow.com/users/3846291/vdep
  • How to compute the largest value in a column using withColumn? - 专门用更有效的方法替换/ sum("count").over()
  • stackoverflow.com/users/10465355/user10465355 你能帮我举一个我的代码中的例子吗,我可以改变什么以及如何改变?会很有帮助的

标签: scala apache-spark hadoop bigdata


【解决方案1】:

请尝试以下更改。

spark.yarn.executor.memoryOverhead = 10000 (1 GB)

(或更多)和

spark.memory.offHeap.enabled = true

您有 offHead Memory 但未启用它,将内存开销设置为更高 GB 将为您的执行程序提供更多内存。

以上应该可以解决内存问题。

如果问题仍然存在,提供更多日志会很有帮助。

【讨论】:

  • 当我添加这个 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hbase sql").set("SPARK_EXECUTOR_MEMORY", "6g").set ("spark.network.timeout", "900s").set("spark.task.max.failure","24").set("spark.memory.offHeap.enabled", "true").set( "spark.yarn.executor.memoryOverhead", "1 GB") val sc = new SparkContext(sparkConf) val spark1 = SparkSession.builder().config(sc.getConf).getOrCreate() 我收到此错误 9/02 /02 15:01:09 错误 SparkContext:初始化 SparkContext 时出错。
  • java.lang.IllegalArgumentException: 要求失败:spark.memory.offHeap.size 必须 > 0 当 spark.memory.offHeap.enabled == true 请告诉我哪里错了
  • @Arush70 :我看到您已删除原始配置 "spark.memory.offheap.size","31113699737" 。请把这个配置放回去。正如我在回答中提到的那样,您有 offHead Memory 但未启用它。请不要对答案投反对票
  • 请告诉我你想对样本输入和输出做什么。我会帮忙的。
  • 我可以把你送到哪里???您可以提供的 gmail 将向您发送示例代码、它的属性文件和它的架构文件。stackoverflow.com/users/1670499/saurabh-shashank
猜你喜欢
  • 1970-01-01
  • 2020-12-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-28
  • 2017-11-18
  • 1970-01-01
相关资源
最近更新 更多