【问题标题】:How does Spark send closures to workers?Spark 如何向工作人员发送闭包?
【发布时间】:2015-08-14 17:22:16
【问题描述】:

当我编写 RDD 转换时,例如

val rdd = sc.parallelise(1 to 1000) 
rdd.map(x => x * 3)

我知道闭包 (x => x * 3) 只是一个 Function1 需要可序列化并且 我想我在某处读到编辑:就在文档中暗示:@987654321 @ 它被“发送”给工作人员执行。 (例如,Akka 向工作人员发送一条“可执行代码”以运行)

它是这样工作的吗?

在我参加的一次聚会上,有人评论说它实际上并没有发送任何序列化代码,但由于每个工作人员无论如何都会得到一个 jar 的“副本”,它只需要引用要运行的函数或类似的东西(但我不确定我是否正确引用了那个人)

我现在完全不知道它是如何工作的。

所以我的问题是

  1. 如何将转换闭包发送给工作人员?通过akka序列化?或者他们“已经在那里”,因为 spark 将整个 uber jar 发送给每个工人(对我来说听起来不太可能......)

  2. 如果是这样,那么罐子的其余部分如何发送给工人?这是“cleanupClosure”在做什么吗?例如只向工作人员发送相关的字节码而不是整个 uberjar? (例如,仅依赖于闭包的代码?)

  3. 总而言之,Spark 是否会在任何时候以某种方式将 --jars 类路径中的 jar 与工作人员同步?还是它会向工人发送“恰到好处”的代码?如果它确实发送了闭包,它们是否被缓存以供重新计算?还是每次安排任务时它都会随任务发送闭包?对不起,如果这是个愚蠢的问题,但我真的不知道。

如果可以,请添加来源以获得答案,我在文档中找不到明确的答案,而且我太谨慎了,无法仅通过阅读代码来得出结论。

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    闭包肯定是在运行时序列化的。我有很多实例在运行时看到 Closure Not Serializable 异常——来自 pyspark 和 scala。有复杂的代码叫

    来自ClosureCleaner.scala

    def clean(
        closure: AnyRef,
        checkSerializable: Boolean = true,
        cleanTransitively: Boolean = true): Unit = {
      clean(closure, checkSerializable, cleanTransitively, Map.empty)
    }
    

    试图缩小被序列化的代码。然后代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。

    下面是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:

      private def ensureSerializable(func: AnyRef) {
        try {
          if (SparkEnv.get != null) {
            SparkEnv.get.closureSerializer.newInstance().serialize(func)
          }
        } catch {
          case ex: Exception => throw new SparkException("Task not serializable", ex)
        }
      }
    

    【讨论】:

    • 这个答案如何解决问题中提出的问题?
    猜你喜欢
    • 2015-11-12
    • 1970-01-01
    • 2011-06-15
    • 1970-01-01
    • 1970-01-01
    • 2017-11-06
    • 1970-01-01
    • 2014-04-27
    • 1970-01-01
    相关资源
    最近更新 更多