【问题标题】:Spark/Java serializable issue - org.apache.spark.SparkException: Task not serializableSpark/Java 可序列化问题 - org.apache.spark.SparkException:任务不可序列化
【发布时间】:2017-11-30 17:31:26
【问题描述】:

我在使用 Java 为 Spark 编写应用程序时遇到以下代码问题:

public class BatchLayerDefaultJob implements Serializable {

private static Function <BatchLayerProcessor, Future> batchFunction = new Function<BatchLayerProcessor, Future>() {
    @Override
    public Future call(BatchLayerProcessor s) {
        return executor.submit(s);
    }
};
public void applicationRunner(BatchParameters batchParameters) {


 SparkConf sparkConf = new SparkConf().setAppName("Platform Engine - Batch Job");
 sparkConf.set("spark.driver.allowMultipleContexts", "true");
 sparkConf.set("spark.cores.max", "1");
 JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
 List<BatchLayerProcessor> batchListforRDD = new ArrayList<BatchLayerProcessor>();

// populate List here.... Then attempt to process below

JavaRDD<BatchLayerProcessor> distData = sparkContext.parallelize(batchListforRDD, batchListforRDD.size());
JavaRDD<Future> result = distData.map(batchFunction);
result.collect(); // <-- Produces an object not serializable exception here 

所以我尝试了很多方法都无济于事,包括将 batchFunction 提取为不受主类影响的单独类,并且我还尝试使用 mapPartitions 而不是 map。我或多或少没有想法。任何帮助表示赞赏。

下面的堆栈跟踪:

17/11/30 17:11:28 INFO DAGScheduler: Job 0 failed: collect at 
BatchLayerDefaultJob.java:122, took 23.406561 s
Exception in thread "Thread-8" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: 
java.io.NotSerializableException: xxxx.BatchLayerProcessor
Serialization stack:
- object not serializable (class: xxxx.BatchLayerProcessor, value: xxxx.BatchLayerProcessor@3e745097)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(xxxx.BatchLayerProcessor@3e745097))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@691)
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 0))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)

干杯。

EDIT::按要求添加了 BatchLayerProcessor - 略微截断:

public class BatchLayerProcessor implements Runnable, Serializable {
private int interval, backMinutes;
private String scoreVal, batchjobid;
private static CountDownLatch countDownLatch;
 public void run() {
    /* Get a reference to the ApplicationContextReader, a singleton*/
    ApplicationContextReader applicationContextReaderCopy = ApplicationContextReader.getInstance();

    synchronized (BatchLayerProcessor.class) /* Protect singleton member variable from multithreaded access. */ {
        if (applicationContextReader == null) /* If local reference is null...*/
            applicationContextReader = applicationContextReaderCopy; /* ...set it to the singleton */
    }

    if (getxScoreVal().equals("")) {
        applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes());
    }
    else {
        applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes());
    }

    countDownLatch.countDown();
}

【问题讨论】:

  • 请提供BatchLayerProcessor的源码
  • 现在添加 - 谢谢

标签: java apache-spark


【解决方案1】:

决定更改 BatchLayerProcessor 使其无法运行,而是依靠 Spark 为我完成工作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-31
    • 2023-04-02
    • 2016-07-27
    • 2015-08-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多