【问题标题】:Spark: Different output with different number of coresSpark:具有不同核心数的不同输出
【发布时间】:2023-04-07 21:19:01
【问题描述】:

当我更改 Spark 应用程序中的内核数量时,我遇到了一个奇怪的行为,代码如下:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Test extends App {
Logger.getLogger("org").setLevel(Level.WARN)
var listLink: List[String] = List()
def addListLink(s: String) = {
val list = s.split(",")
for (i <- 0 to list.length - 2) {
  listLink = list(i)+ "-" + list(i + 1) :: listLink
 }
}
val conf = new SparkConf().setMaster("local[1]").setAppName("Simple Application")
val sc = new SparkContext(conf)
val paths = sc.textFile("file:///tmp/percorsi.txt")
paths.foreach(x => addListLink(x))
println("Number of items:"+listLink.size)
println(listLink)
}

我的输入文件是这样的:

A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C
A,B,C
A,B,C
A,B,C
A,B,C
B,C,D
B,C,D
B,C,D
B,C,D
C,D
C,D

基本上,对于我调用我的方法的每条路径,它都会将一个元素添加到一个表示每两个连续元素的列表中:

示例:"A,B,C,D" => ("A-B","B-C","C-D")

如您所见,代码中只有一个核心

.setMaster("local[1]")

如果我运行我的应用程序(本地或集群上),我会得到我期望的结果

println("Number of items:"+listLink.size)
//Result --> Number of Items : 38

如果我将核心数更改为 3(例如),我会得到不同的值。 例如 33 个项目而不是 38 个。

我是否遗漏了有关核心数量或其他内容(分区、ecc...)的信息?

我认为这是一个非常简单的应用程序,但无论如何我都会遇到这种奇怪的行为。

谁能帮帮我?

提前致谢

FF

【问题讨论】:

  • 你为什么在你的RDD上使用foreach???你想做什么?
  • 对于我的 RDD 中的每个元素,我想调用该方法以增加我的列表
  • 您可能听说过 map reduce 吗?不? foreach对每个数据项执行一个无参数函数。
  • 是的确实...如前所述,也许我遗漏了一些东西,但我可以问你一个更具体的提示吗?我的意思是,这只是 foreach 风格的问题吗?
  • 好的,首先,但仍然不是关于“风格”,其次你能解释一下你想用链接列表功能做什么吗?

标签: scala parallel-processing apache-spark cpu-cores


【解决方案1】:

每个分区有一个单独的listLink。因此,您将项目添加到多个列表中,最后只打印一个。

通常,当函数传递给 Spark 操作(例如 map 或 reduce) 在远程集群节点上执行,它单独工作 函数中使用的所有变量的副本。这些变量是 复制到每台机器,并且不更新远程变量 机器被传播回驱动程序。

(从这里https://spark.apache.org/docs/latest/programming-guide.html#shared-variables

今天是你的幸运日:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer


val data = List(
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C",
"A,B,C",
"A,B,C",
"A,B,C",
"A,B,C",
"B,C,D",
"B,C,D",
"B,C,D",
"B,C,D",
"C,D",
"C,D")

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)


val dataRDD = sc.makeRDD(data, 1)
val linkRDD = dataRDD.flatMap(_.split(",").sliding(2).map{_.mkString("", "-", "")})

linkRDD.foreach(println)

输出:

A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
A-B
B-C
A-B
B-C
A-B
B-C
A-B
B-C
B-C
C-D
B-C
C-D
B-C
C-D
B-C
C-D
C-D
C-D

【讨论】:

  • 非常感谢 Paul,这正是我的猜测……我完全忘记了共享变量……我会看看如何修改我的代码。
  • 我想在接受您的回答之前解决我的“问题”(包括代码),以帮助有同样疑问的其他人
  • 好的。一般来说,SO 不是“请写我的代码”网站。此外,我认为,生成链接列表并不是最终目标。请描述您要计算的内容,因为可能有一种更类似于 Spark 的方式来计算。正如@eliasah 所要求的那样。
  • 再次感谢 Paul... 正如我在对您的回答的第一条评论中所说,我会尽快改进我的代码(因为我不在我的电脑前)。我知道 SO 不是“写我的代码”。不管怎样,既然是你写的,我再次感谢你。
  • 顺便说一句,你的火花解决方案绝对干净:-)
猜你喜欢
  • 1970-01-01
  • 2021-07-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-06-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多