【问题标题】:Spark Streaming: Could not compute split, block not foundSpark Streaming:无法计算拆分,未找到块
【发布时间】:2015-01-20 02:14:42
【问题描述】:

我正在尝试将 Spark Streaming 与 Kafka(版本 1.1.0)一起使用,但由于此错误,Spark 作业不断崩溃:

14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

我从日志中获得的唯一相关信息是:

14/11/21 12:34:18 INFO MemoryStore: Block input-0-1416573258200 stored as bytes to memory (size 85.8 KB, free 2.3 GB)
14/11/21 12:34:18 INFO BlockManagerMaster: Updated info of block input-0-1416573258200
14/11/21 12:34:18 INFO BlockGenerator: Pushed block input-0-1416573258200
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
14/11/21 12:37:35 INFO BlockManagerInfo: Added input-0-1416573258200 in memory on ********:43117 (size: 85.8 KB, free: 2.3 GB)
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

示例代码:

SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
jssc.checkpoint(checkpointDir);

HashMap<String, Integer> topics = new HashMap<String, Integer>();
topics.put(KAFKA_TOPIC, 1);

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("group.id", "spark-streaming-test");
kafkaParams.put("zookeeper.connect", ZOOKEEPER_QUORUM);
kafkaParams.put("zookeeper.connection.timeout.ms", "1000");
kafkaParams.put("auto.offset.reset", "smallest");

JavaPairReceiverInputDStream<String, String> kafkaStream = 
  KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String, String> streamPair = kafkaStream.flatMapToPair(...).reduceByKey(...);

我不确定这个问题的原因是什么。

【问题讨论】:

  • 工作表现如何?落后了吗?
  • 不,它没有落后。
  • 您找到解决方案了吗?我对 Kafka/Spark Streaming 1.2 有同样的问题
  • 我也看到了 Kafka/Spark Streaming 1.2 的这个问题。然而,我的一些工作最终要等待相当长的时间 - 所以我确实符合落后标准。
  • 我遇到了同样的问题。是否可能是内存或主/工作人员通信问题?我的猜测是追索权不足会产生这个错误

标签: apache-spark spark-streaming


【解决方案1】:

【讨论】:

  • 欢迎来到这个网站。虽然链接有帮助,但可以移动或删除资源。因此,在不解释链接为解决方案带来什么的情况下不提供链接是一种很好的做法。见How to Answer
  • 我修复了这个问题以减少接收器中的输入数据。我认为一个可能的原因可能是输入数据超出了处理能力。
【解决方案2】:

检查以下内容。

1) 您是否像在

中那样正确地创建了流式上下文
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

你的初始化不正确。

请看下面

例如:recoverableNetworkCount App的代码

2) 是否启用了属性 write ahead log "spark.streaming.receiver.writeAheadLog.enable"

3) 在 Streaming UI 中检查流媒体的稳定性。 处理时间

【讨论】:

    【解决方案3】:

    这是由于 Spark 流模型。它收集批处理间隔的数据并将其发送到火花引擎进行处理。 Spark 引擎不知道它来自流式系统,并且不会将其传回流式组件。

    这意味着没有流控制(背压控制),不像 Storm 或 Flink 这样的原生流系统可以根据处理速率很好地平滑 spout/source 流。

    来自https://spark.apache.org/docs/latest/streaming-programming-guide.html

    解决这个问题的一个选项是手动将处理信息/确认传递回接收器组件 - 当然这也意味着我们需要使用自定义接收器。此时,我们开始构建 Storm/Flink 等提供的开箱即用的功能。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-14
      • 2017-03-06
      • 1970-01-01
      • 2022-06-29
      • 2021-03-18
      • 2016-06-17
      相关资源
      最近更新 更多