【问题标题】:Understanding Spark's closures and their serialization了解 Spark 的闭包及其序列化
【发布时间】:2017-03-08 15:18:27
【问题描述】:

免责声明:刚刚开始使用 Spark。

我无法理解著名的“任务不可序列化”异常,但我的问题与我在 SO 上看到的问题略有不同(或者我认为如此)。

我有一个很小的自定义 RDD (TestRDD)。它有一个字段,用于存储其类未实现 Serializable (NonSerializable) 的对象。我已将“spark.serializer”配置选项设置为使用 Kryo。但是,当我在我的 RDD 上尝试 count() 时,我得到以下信息:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)

当我查看 DAGScheduler.submitMissingTasks 内部时,我发现它在我的 RDD 上使用了它的 closure 序列化器,它是 Java 序列化器,而不是我期望的 Kryo 序列化器。我读过 Kryo 在序列化闭包时遇到问题,Spark 总是使用 Java 序列化器进行闭包,但我完全不明白闭包是如何在这里发挥作用的。我在这里所做的就是:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

也就是说,没有映射器或任何需要序列化闭包的东西。 OTOH 这行得通:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

Kryo 序列化器按预期使用,不涉及闭包序列化器。如果我没有将序列化程序属性设置为 Kryo,我也会在这里遇到异常。

感谢任何解释闭包来自何处以及如何确保我可以使用 Kryo 序列化自定义 RDD 的指针。

更新:这里是TestRDD,其不可序列化字段mNS

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}

【问题讨论】:

  • 您的TestRDD 中是否有包含SparkContext 的字段?向我们展示您对TestRDD 的定义或创建minimal reproducible example
  • @YuvalItzchakov 就是这样。 SparkContext 被传递给超级的构造函数,所以是的,RDD 确实持有它。不过,这个例外似乎并没有抱怨。
  • 你能发帖NonSerializable吗?
  • 就像class NonSerializable {}一样简单

标签: java serialization apache-spark closures


【解决方案1】:

当我查看 DAGScheduler.submitMissingTasks 内部时,我看到它使用 它在我的 RDD 上的闭包序列化器,它是 Java 序列化器,而不是 我期望的 Kryo 序列化程序。

SparkEnv 支持两个序列化器,一个名为serializer,用于数据序列化、检查点、工作人员之间的消息传递等,可在spark.serializer 配置标志下使用。另一个在spark.closure.serializer 下称为closureSerializer,用于检查您的对象是否实际上是可序列化的,并且可配置为Spark JavaSerializer 之外没有其他实际工作)并从2.0.0 硬编码及以上至JavaSerializer

Kryo 闭包序列化程序有一个错误,使其无法使用,您可以在 SPARK-7708 下看到该错误(这可能已在 Kryo 3.0.0 中修复,但 Spark 目前已使用特定版本的 Chill 修复,该版本已修复克里奥 2.2.1)。此外,对于 Spark 2.0.x,JavaSerializer 现在是固定的而不是可配置的(您可以看到它in this pull request)。这意味着我们实际上坚持使用 JavaSerializer 进行闭包序列化。

我们使用一个序列化程序来提交任务,而另一个序列化程序在工作人员之间序列化数据,这很奇怪吗?当然,但这就是我们所拥有的。

总而言之,如果您要设置 spark.serializer 配置或使用 SparkContext.registerKryoClasses,您将在 Spark 中使用 Kryo 进行大部分序列化。话虽如此,为了检查给定类是否可序列化并将任务序列化给工作人员,Spark 将使用JavaSerializer

【讨论】:

  • 谢谢,但怎么不正确?我可以看到DAGScheduler 使用closureSerializer 字段,而不是serializer 字段。无论我是否将环境设置为使用 Kryo,SparkEnv.get.closureSerializer 始终是 Java 序列化程序(iirc,他们甚至将选项 spark.closure.serializer 从 2.0 中撤出,因为它无论如何都被忽略了)所以我明白它为什么会失败。问题不同:为什么调度程序在我的情况下使用闭包序列化程序?如何让它为我的 RDD 使用 Kryo>
  • hm,我相信这个陈述对于 Spark 2.0.0 和 2.0.1 是正确的(从堆栈跟踪中可以看出)。您也可以查看 SPARK-12414。 closureSerializer 可能是抽象类型,但 AFAICT 只使用了一种实现。
  • @PavelKlinov 你是对的。我挖得更深了,看看我的更新。
  • 好的,谢谢。我不得不说,如果你是对的,这听起来有点奇怪。 Kryo 的原因之一(除了速度/大小)是能够处理不是Serializable 的对象(并且不能这样)。 Kryo 确实可以将它们序列化。但是,如果 Spark 甚至没有达到 Kryo 序列化它们的地步,那还有什么意义……而且,我们似乎以不同的方式使用“闭包”这个术语:我的意思是 Scala 闭包,而您似乎是指传递闭包可从 RDD 访问的对象。
  • @PavelKlinov 如果您有任何不可序列化的属性,通常的做法是将其标记为@transient 并让工作人员延迟加载。
猜你喜欢
  • 2011-10-07
  • 2017-04-18
  • 2018-04-08
  • 1970-01-01
  • 2021-03-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-02-22
相关资源
最近更新 更多