【问题标题】:Is it possible to execute a command on all workers within Apache Spark?是否可以对 Apache Spark 中的所有工作人员执行命令?
【发布时间】:2017-04-13 22:09:34
【问题描述】:

我有一种情况,我想在 Spark 中的每个工作人员上执行一个系统进程。我希望这个过程在每台机器上运行一次。具体来说,这个进程启动了一个守护进程,它需要在我的程序的其余部分执行之前运行。理想情况下,这应该在我读取任何数据之前执行。

我正在使用 Spark 2.0.2 并使用动态分配。

【问题讨论】:

标签: java scala apache-spark daemon


【解决方案1】:

您可以通过结合使用惰性验证和 Spark 广播来实现此目的。它会像下面这样。 (下面的代码还没有编译,你可能需要改变一些东西)

object ProcessManager {
  lazy val start = // start your process here.
}

您可以在应用程序开始时广播此对象,然后再进行任何转换。

val pm = sc.broadcast(ProcessManager)

现在,您可以像访问任何其他广播变量一样在转换中访问此对象并调用惰性验证。

rdd.mapPartition(itr => {
  pm.value.start
  // Other stuff here.
}

【讨论】:

  • 这不是每个分区执行一次进程,而不是每个worker执行一次吗?
  • 你是对的,这只是一个例子。但是由于它是一个惰性 val 并且 ProcessManager 是一个“对象”,它只在 executor 中运行一次。
  • 广播该对象有点奇怪。您应该广播数据,而不是代码。只需拥有对象并访问 start 变量就足够了。这样你就不需要 ProcessManager 对象是可序列化的。
  • @Jegan 你能帮我弄清楚 jave 类比吗\
  • @Jegan 现在您的代码将不适用于所有工人,而是适用于拥有rdd 的工人。我错了吗?
【解决方案2】:

带有静态初始化的object 调用您的系统进程应该可以解决问题。

object SparkStandIn extends App {
  object invokeSystemProcess {
    import sys.process._
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".!

    def doIt(): Unit = {
      // this object will construct once per jvm, but objects are lazy in
      // another way to make sure instantiation happens is to check that the errorCode does not represent an error
    }
  }
  invokeSystemProcess.doIt()
  invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}

【讨论】:

  • 但是你如何确保它在不重复调用每个转换的情况下实际初始化?
【解决方案3】:

针对特定用例的特定答案,我有一个包含 50 个节点的集群,我想知道哪些节点设置了 CET 时区:

(1 until 100).toSeq.toDS.
mapPartitions(itr => {
        sys.process.Process(
                Seq("bash", "-c", "echo $(hostname && date)")
        ).
        lines.
        toIterator
}).
collect().
filter(_.contains(" CET ")).
distinct.
sorted.
foreach(println)

请注意,我不认为 100% 可以保证每个节点都有一个分区,因此命令可能不会在每个节点上运行,即使在具有 50 个节点的集群中使用 100 个元素的数据集(如上一个示例) .

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-08-29
    • 2022-06-22
    • 1970-01-01
    • 2011-07-22
    • 1970-01-01
    • 1970-01-01
    • 2021-10-24
    • 2021-12-27
    相关资源
    最近更新 更多