【问题标题】:Spark exception: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext火花异常:java.io.NotSerializableException:org.apache.spark.streaming.api.java.JavaStreamingContext
【发布时间】:2017-02-02 14:10:53
【问题描述】:

我正在尝试运行一个相当简单的示例,该示例涉及将 Spark 连接到 Cassandra 并聚合数据。该实现使用的是 spring-cassandra 连接器、java、spring 等等。

这是我通过 Spring 连接的 Spark 配置文件

@Configuration
@ComponentScan("test.spark.service")
@Import({CassandraConfig.class})
public class SparkConfig {

    @Autowired
    private String cassandraUrl;

    @Bean
    public SparkConf sparkConf() {
        SparkConf sparkConf = new SparkConf();

        // configure all the bells and whistles
        sparkConf
                .setMaster("spark://localhost:7077")
                .setAppName("DataAggregator")
                .set("spark.cassandra.connection.host", cassandraUrl);

        return sparkConf;
    }

    @Bean
    public JavaStreamingContext javaStreamingContext() {
        return new JavaStreamingContext(sparkConf(), new Duration(1000));
    }
}

这是不抛出异常的服务类

@Service
public class SparkServiceImpl implements SparkService, Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Override
    public void process() {
        CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");

    }
}

这似乎工作并返回一个 CassandraJavaRDD

一旦我将实现更改为使用 groupBy/函数,它就会因可序列化异常而崩溃

@Service
public class SparkServiceImpl implements SparkService, Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Override
    public void process() {
        CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");

        JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() {
            @Override
            public Integer call(CassandraRow row) throws Exception {
                return row.getInt("int_column");
            }
        });
    }
}

这是堆栈跟踪

org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664)
    at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242)
    at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45)
    at test.spark.service.SparkServiceImpl.process(SparkServiceServiceImpl.java:56)
    at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext
Serialization stack:
    - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@4538856f)
    - field (class: test.spark.service.SparkServiceImpl, name: javaStreamingContext, type: class org.apache.spark.streaming.api.java.JavaStreamingContext)
    - object (class test.spark.service.SparkServiceImpl, test.spark.service.SparkServiceImpl@7e34b127)
    - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl)
    - object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@536b71b4)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 52 more

除了这个异常,如果我的服务不可序列化,它也会抛出一个异常

这里是服务

@Service
public class SparkServiceImpl implements SparkService {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Override
    public void process() {
        CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");

        JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() {
            @Override
            public Integer call(CassandraRow row) throws Exception {
                return row.getInt("int_column");
            }
        });
    }
}

这里是例外

org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664)
    at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242)
    at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45)
    at test.spark.service.SparkServiceImpl.process(SparkServiceImpl.java:32)
    at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: test.spark.service.SparkServiceImpl
Serialization stack:
    - object not serializable (class: test.spark.service.SparkServiceImpl, value: test.spark.service.SparkServiceImpl@47b269c4)
    - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl)
    - object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@23ad71bf)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 52 more

【问题讨论】:

  • 您是如何开始 Spark 作业的?您似乎正在尝试序列化您的 SparkServiceImpl 类,该类包含 JavaStreamingContext
  • 看到这个` - 对象不可序列化(类:test.spark.service.SparkServiceImpl,值:test.spark.service.SparkServiceImpl@47b269c4) - 字段(类:test.spark.service.SparkServiceImpl $1,名称:this$0,类型:类 test.spark.service.SparkServiceImpl)-对象(类 test.spark.service.SparkServiceImpl$1,test.spark.service.SparkServiceImpl$1@23ad71bf)-字段(类:org.apache .spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,名称:fun$1,类型:接口 org.apache.spark.api.java.function.Function) - 对象(类 org.apache.spark.api.java .JavaPairRDD$$anonfun$to`
  • 序列化调试器打印这些。所以你必须看看这些。
  • 因为@YuvalItzchakov 已经提到,Class 不可序列化
  • 我不想序列化服务或流上下文,这对我来说没有意义。根据他们的文档,这是您通过 CassandraJavaUtil 或 CassandraStreamingJavaUtil 执行此操作的方式。 github.com/datastax/spark-cassandra-connector/blob/master/doc/…

标签: java spring apache-spark cassandra spark-cassandra-connector


【解决方案1】:

快速修复:

transient 关键字添加到您的 SparkServiceImpl 中的 JavaStreamingContext @Bean

@Autowired
private transient JavaStreamingContext javaStreamingContext;

快速解释原因:

这是因为 JavaStreamingContext 是在驱动程序上创建的,而 JavaStreamingContext 是 Spark Streaming 功能的主要入口点。

在您的 SparkService 实现中 - SparkServiceImpl - 您对 RDD 进行了一些操作,并且 master 为声明的转换创建任务。 在这个阶段之后,创建的任务被发送给工人,基本上这是最终执行任务的地方。

所以 workers 不需要 SparkContext 以及 JavaStreamingContext - 正如你所说,序列化 JavaStreamingContext 没有意义。

使用 transient 关键字,您只是说您不想序列化 JavaStreamingContext 并且可以执行 spark 作业。

【讨论】:

    猜你喜欢
    • 2018-11-02
    • 1970-01-01
    • 2016-01-11
    • 1970-01-01
    • 2017-05-20
    • 1970-01-01
    • 1970-01-01
    • 2020-02-28
    • 1970-01-01
    相关资源
    最近更新 更多