【发布时间】:2015-09-14 16:32:52
【问题描述】:
我是 scala 和 spark 的新手。我正在尝试加入来自两个不同文本文件的两个 RDD。在每个文本文件中,有两列由制表符分隔,例如
text1 text2
100772C111 ion 200772C222 ion
100772C111 on 200772C222 gon
100772C111 n 200772C2 n
所以我想根据它们的第二列加入这两个文件并得到如下结果,这意味着给定这两个文档有 2 个常用术语:
((100772C111-200772C222,2))
我的电脑功能:
4 X (intel(r) core(tm) i5-2430m cpu @2.40 ghz)
8 GB 内存
我的脚本:
import org.apache.spark.{SparkConf, SparkContext}
object hw {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "C:\\spark-1.4.1\\winutils")
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val emp = sc.textFile("S:\\Staff_files\\Mehmet\\Projects\\SPARK - `scala\\wos14.txt")
.map { line => val parts = line.split("\t")((parts(5)),parts(0))}
val emp_new = sc.textFile("C:\\WHOLE_WOS_TEXT\\fwo_word.txt")
.map{ line2 => val parts = line2.split("\t")
((parts(3)),parts(1)) }
val finalemp = emp_new.distinct().join(emp.distinct())
.map{case((nk1), ((parts1),(val1))) => (parts1 + "-" + val1, 1)}
.reduceByKey((a, b) => a + b)
finalemp.foreach(println)
}
}
当我尝试使用较小尺寸的文本文件时,此代码给出了我想要的。但是,我想为大文本文件实现这个脚本。我有一个大小为 110 KB(约 4M 行)的文本文件和另一个 9 GB(超过 1B 行)的文本文件。
当我使用这两个文本文件运行脚本时,我在日志屏幕上观察到:
15/09/04 18:19:06 INFO TaskSetManager: Finished task 177.0 in stage 1.0 (TID 181) in 9435 ms on localhost (178/287)
15/09/04 18:19:06 INFO HadoopRDD: Input split: file:/S:/Staff_files/Mehmet/Projects/SPARK - scala/wos14.txt:5972688896+33554432
15/09/04 18:19:15 INFO Executor: Finished task 178.0 in stage 1.0 (TID 182). 2293 bytes result sent to driver
15/09/04 18:19:15 INFO TaskSetManager: Starting task 179.0 in stage 1.0 (TID 183, localhost, PROCESS_LOCAL, 1422 bytes)
15/09/04 18:19:15 INFO Executor: Running task 179.0 in stage 1.0 (TID 183)
15/09/04 18:19:15 INFO TaskSetManager: Finished task 178.0 in stage 1.0 (TID 182) in 9829 ms on localhost (179/287)
15/09/04 18:19:15 INFO HadoopRDD: Input split: file:/S:/Staff_files/Mehmet/Projects/SPARK - scala/wos14.txt:6006243328+33554432
15/09/04 18:19:25 INFO Executor: Finished task 179.0 in stage 1.0 (TID 183). 2293 bytes result sent to driver
15/09/04 18:19:25 INFO TaskSetManager: Starting task 180.0 in stage 1.0 (TID 184, localhost, PROCESS_LOCAL, 1422 bytes)
15/09/04 18:19:25 INFO Executor: Running task 180.0 in stage 1.0 (TID 184)
...
15/09/04 18:37:49 INFO ExternalSorter: Thread 101 spilling in-memory map of 5.3 MB to disk (13 times so far)
15/09/04 18:37:49 INFO BlockManagerInfo: Removed broadcast_2_piece0 on `localhost:64567 in memory (size: 2.2 KB, free: 969.8 MB)
15/09/04 18:37:49 INFO ExternalSorter: Thread 101 spilling in-memory map of 5.3 MB to disk (14 times so far)...
那么在本地处理这样的文本文件是否合理?等待了 3 个多小时后,程序仍在将数据溢出到磁盘。
总而言之,我需要在代码中进行一些更改以应对性能问题吗?
【问题讨论】:
标签: scala apache-spark