【问题标题】:Spark streaming not remembering previous state火花流不记得以前的状态
【发布时间】:2016-02-28 04:48:20
【问题描述】:

我编写了带有状态转换的火花流程序。 似乎我的火花流应用程序正在通过检查点正确地进行计算。 但是,如果我终止我的程序并再次启动它,它就不会读取以前的检查点数据并从头开始盯着看。这是预期的行为吗?

我是否需要更改我的程序中的任何内容,以便它能够记住以前的数据并从那里开始计算?

提前致谢。

供参考我的程序:

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val inputStream = ssc.socketTextStream(<hostname>, 9999)
    ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir")
    inputStream.print(1)
    val parsedStream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
      })
    import breeze.linalg.{DenseVector => BDV}
    import scala.util.Try

    val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
      (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
        prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })
    state.checkpoint(Duration(10000))
    state.foreachRDD(rdd => rdd.foreach(Blaher.blah))

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }
}

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    根据 spark-streaming 文档,您应该以不同的方式初始化上下文:

    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
        val ssc = new StreamingContext(...)   // new context
        val lines = ssc.socketTextStream(...) // create DStreams
        ...
        ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
        ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()
    

    checkpointing

    【讨论】:

      【解决方案2】:

      正如checkpointing documentation 中所述,您必须调整代码才能从检查点恢复状态。

      特别是您不能直接创建StreamingContext,而必须使用StreamingContext.getOrCreate 方法,该方法采用:

      • 检查点目录
      • 可用于设置上下文的函数 (Unit =&gt; StreamingContext)

      【讨论】:

      • 感谢 zero323。我能够做到。它现在正在记住以前的状态。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-04-27
      • 2021-05-15
      • 2019-04-02
      • 2016-02-07
      • 2015-05-15
      • 1970-01-01
      相关资源
      最近更新 更多