【发布时间】:2017-04-13 22:09:34
【问题描述】:
我有一种情况,我想在 Spark 中的每个工作人员上执行一个系统进程。我希望这个过程在每台机器上运行一次。具体来说,这个进程启动了一个守护进程,它需要在我的程序的其余部分执行之前运行。理想情况下,这应该在我读取任何数据之前执行。
我正在使用 Spark 2.0.2 并使用动态分配。
【问题讨论】:
标签: java scala apache-spark daemon
我有一种情况,我想在 Spark 中的每个工作人员上执行一个系统进程。我希望这个过程在每台机器上运行一次。具体来说,这个进程启动了一个守护进程,它需要在我的程序的其余部分执行之前运行。理想情况下,这应该在我读取任何数据之前执行。
我正在使用 Spark 2.0.2 并使用动态分配。
【问题讨论】:
标签: java scala apache-spark daemon
您可以通过结合使用惰性验证和 Spark 广播来实现此目的。它会像下面这样。 (下面的代码还没有编译,你可能需要改变一些东西)
object ProcessManager {
lazy val start = // start your process here.
}
您可以在应用程序开始时广播此对象,然后再进行任何转换。
val pm = sc.broadcast(ProcessManager)
现在,您可以像访问任何其他广播变量一样在转换中访问此对象并调用惰性验证。
rdd.mapPartition(itr => {
pm.value.start
// Other stuff here.
}
【讨论】:
rdd 的工人。我错了吗?
带有静态初始化的object 调用您的系统进程应该可以解决问题。
object SparkStandIn extends App {
object invokeSystemProcess {
import sys.process._
val errorCode = "echo Whatever you put in this object should be executed once per jvm".!
def doIt(): Unit = {
// this object will construct once per jvm, but objects are lazy in
// another way to make sure instantiation happens is to check that the errorCode does not represent an error
}
}
invokeSystemProcess.doIt()
invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}
【讨论】:
针对特定用例的特定答案,我有一个包含 50 个节点的集群,我想知道哪些节点设置了 CET 时区:
(1 until 100).toSeq.toDS.
mapPartitions(itr => {
sys.process.Process(
Seq("bash", "-c", "echo $(hostname && date)")
).
lines.
toIterator
}).
collect().
filter(_.contains(" CET ")).
distinct.
sorted.
foreach(println)
请注意,我不认为 100% 可以保证每个节点都有一个分区,因此命令可能不会在每个节点上运行,即使在具有 50 个节点的集群中使用 100 个元素的数据集(如上一个示例) .
【讨论】: