【问题标题】:Performance issue relating to joining big text files in local与在本地加入大文本文件有关的性能问题
【发布时间】:2015-09-14 16:32:52
【问题描述】:

我是 scala 和 spark 的新手。我正在尝试加入来自两个不同文本文件的两个 RDD。在每个文本文件中,有两列由制表符分隔,例如

text1               text2
100772C111  ion     200772C222  ion
100772C111  on      200772C222  gon
100772C111  n       200772C2    n

所以我想根据它们的第二列加入这两个文件并得到如下结果,这意味着给定这两个文档有 2 个常用术语:

((100772C111-200772C222,2))

我的电脑功能:

4 X (intel(r) core(tm) i5-2430m cpu @2.40 ghz)
8 GB 内存

我的脚本:

import org.apache.spark.{SparkConf, SparkContext}

object hw {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "C:\\spark-1.4.1\\winutils")
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)
    val emp = sc.textFile("S:\\Staff_files\\Mehmet\\Projects\\SPARK - `scala\\wos14.txt")
                .map { line => val parts = line.split("\t")((parts(5)),parts(0))}

    val emp_new = sc.textFile("C:\\WHOLE_WOS_TEXT\\fwo_word.txt")
                    .map{ line2 => val parts = line2.split("\t")
                    ((parts(3)),parts(1)) }

    val finalemp = emp_new.distinct().join(emp.distinct())
                          .map{case((nk1), ((parts1),(val1))) => (parts1 + "-" + val1, 1)}
                          .reduceByKey((a, b) => a + b)
    finalemp.foreach(println)
  }
}

当我尝试使用较小尺寸的文本文件时,此代码给出了我想要的。但是,我想为大文本文件实现这个脚本。我有一个大小为 110 KB(约 4M 行)的文本文件和另一个 9 GB(超过 1B 行)的文本文件。

当我使用这两个文本文件运行脚本时,我在日志屏幕上观察到:

15/09/04 18:19:06 INFO TaskSetManager: Finished task 177.0 in stage 1.0 (TID 181) in 9435 ms on localhost (178/287)
15/09/04 18:19:06 INFO HadoopRDD: Input split: file:/S:/Staff_files/Mehmet/Projects/SPARK - scala/wos14.txt:5972688896+33554432
15/09/04 18:19:15 INFO Executor: Finished task 178.0 in stage 1.0 (TID 182). 2293 bytes result sent to driver
15/09/04 18:19:15 INFO TaskSetManager: Starting task 179.0 in stage 1.0 (TID 183, localhost, PROCESS_LOCAL, 1422 bytes)
15/09/04 18:19:15 INFO Executor: Running task 179.0 in stage 1.0 (TID 183)
15/09/04 18:19:15 INFO TaskSetManager: Finished task 178.0 in stage 1.0 (TID 182) in 9829 ms on localhost (179/287)
15/09/04 18:19:15 INFO HadoopRDD: Input split: file:/S:/Staff_files/Mehmet/Projects/SPARK - scala/wos14.txt:6006243328+33554432
15/09/04 18:19:25 INFO Executor: Finished task 179.0 in stage 1.0 (TID 183). 2293 bytes result sent to driver
15/09/04 18:19:25 INFO TaskSetManager: Starting task 180.0 in stage 1.0 (TID 184, localhost, PROCESS_LOCAL, 1422 bytes)
15/09/04 18:19:25 INFO Executor: Running task 180.0 in stage 1.0 (TID 184)
...


15/09/04 18:37:49 INFO ExternalSorter: Thread 101 spilling in-memory map of 5.3 MB to disk (13 times so far)
15/09/04 18:37:49 INFO BlockManagerInfo: Removed broadcast_2_piece0 on `localhost:64567 in memory (size: 2.2 KB, free: 969.8 MB)
15/09/04 18:37:49 INFO ExternalSorter: Thread 101 spilling in-memory map of 5.3 MB to disk (14 times so far)...

那么在本地处理这样的文本文件是否合理?等待了 3 个多小时后,程序仍在将数据溢出到磁盘。

总而言之,我需要在代码中进行一些更改以应对性能问题吗?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您是否为 Spark 提供了足够的内存?这并不完全明显,但默认情况下,Spark 以非常小的内存分配开始。它不会像 RDMS 那样使用尽可能多的内存。您需要告诉它您希望它使用多少。

    默认是(我相信)每个节点一个执行器,每个执行器 512MB 的 RAM。您可以非常轻松地扩大规模:

    spark-shell --driver-memory 1G --executor-memory 1G --executor-cores 3 --num-executors 3 
    

    更多设置在这里:http://spark.apache.org/docs/latest/configuration.html#application-properties

    您可以在 SparkUI 上查看为 Spark 环境和每个执行程序分配了多少内存,(默认情况下)位于 http://localhost:4040

    【讨论】:

    • 看来 spark 有(在我的情况下)默认的 2g 内存,它提供大约 1g 的存储内存。从日志行看来,如果我正确理解的话,每个分区的现有内存都超过了所需的内存。即使我给了4g并尝试收集emp(emp.collect())就足够了。但是我得到了 java heap size out of memory 错误。
    猜你喜欢
    • 2022-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-29
    • 1970-01-01
    相关资源
    最近更新 更多