【发布时间】:2023-04-07 21:19:01
【问题描述】:
当我更改 Spark 应用程序中的内核数量时,我遇到了一个奇怪的行为,代码如下:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Test extends App {
Logger.getLogger("org").setLevel(Level.WARN)
var listLink: List[String] = List()
def addListLink(s: String) = {
val list = s.split(",")
for (i <- 0 to list.length - 2) {
listLink = list(i)+ "-" + list(i + 1) :: listLink
}
}
val conf = new SparkConf().setMaster("local[1]").setAppName("Simple Application")
val sc = new SparkContext(conf)
val paths = sc.textFile("file:///tmp/percorsi.txt")
paths.foreach(x => addListLink(x))
println("Number of items:"+listLink.size)
println(listLink)
}
我的输入文件是这样的:
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C
A,B,C
A,B,C
A,B,C
A,B,C
B,C,D
B,C,D
B,C,D
B,C,D
C,D
C,D
基本上,对于我调用我的方法的每条路径,它都会将一个元素添加到一个表示每两个连续元素的列表中:
示例:"A,B,C,D" => ("A-B","B-C","C-D")
如您所见,代码中只有一个核心
.setMaster("local[1]")
如果我运行我的应用程序(本地或集群上),我会得到我期望的结果
println("Number of items:"+listLink.size)
//Result --> Number of Items : 38
如果我将核心数更改为 3(例如),我会得到不同的值。 例如 33 个项目而不是 38 个。
我是否遗漏了有关核心数量或其他内容(分区、ecc...)的信息?
我认为这是一个非常简单的应用程序,但无论如何我都会遇到这种奇怪的行为。
谁能帮帮我?
提前致谢
FF
【问题讨论】:
-
你为什么在你的RDD上使用foreach???你想做什么?
-
对于我的 RDD 中的每个元素,我想调用该方法以增加我的列表
-
您可能听说过 map reduce 吗?不?
foreach对每个数据项执行一个无参数函数。 -
是的确实...如前所述,也许我遗漏了一些东西,但我可以问你一个更具体的提示吗?我的意思是,这只是 foreach 风格的问题吗?
-
好的,首先,但仍然不是关于“风格”,其次你能解释一下你想用链接列表功能做什么吗?
标签: scala parallel-processing apache-spark cpu-cores