【问题标题】:What is the best way to restart spark streaming application?重新启动火花流应用程序的最佳方法是什么?
【发布时间】:2017-01-18 13:52:08
【问题描述】:

我基本上想在我的驱动程序中编写一个事件回调,它将在该事件到达时重新启动 spark 流应用程序。 我的驱动程序通过从文件中读取配置来设置流和执行逻辑。 每当文件更改(添加新配置)时,驱动程序必须按顺序执行以下步骤,

  1. 重启,
  2. 读取配置文件(作为主要方法的一部分)和
  3. 设置流

实现这一目标的最佳方法是什么?

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming apache-spark-2.0


    【解决方案1】:

    在某些情况下,您可能希望动态地重新加载流式上下文(例如重新加载流式操作)。 在这种情况下,您可以(Scala 示例):

    val sparkContext = new SparkContext()
    
    val stopEvent = false
    var streamingContext = Option.empty[StreamingContext]
    val shouldReload = false
    
    val processThread = new Thread {
      override def run(): Unit = {
        while (!stopEvent){
          if (streamingContext.isEmpty) {
    
            // new context
            streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))
    
            // create DStreams
              val lines = streamingContext.socketTextStream(...)
    
            // your transformations and actions
            // and decision to reload streaming context
            // ...
    
            streamingContext.get.start()
          } else {
            if (shouldReload) {
              streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
              streamingContext.get.awaitTermination()
              streamingContext = Option.empty[StreamingContext]
            } else {
              Thread.sleep(1000)
            }
          }
    
        }
        streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
        streamingContext.get.awaitTermination()
      }
    }
    
    // and start it  in separate thread
    processThread.start()
    processThread.join()
    

    或在 python 中:

    spark_context = SparkContext()
    
    stop_event = Event()
    spark_streaming_context = None
    should_reload = False
    
    def process(self):
        while not stop_event.is_set():
            if spark_streaming_context is None:
    
                # new context
                spark_streaming_context = StreamingContext(spark_context, 0.5)
    
                # create DStreams
                lines = spark_streaming_context.socketTextStream(...)  
    
                # your transformations and actions
                # and decision to reload streaming context
                # ...
    
                self.spark_streaming_context.start()
            else:
                # TODO move to config
                if should_reload:
                    spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
                    spark_streaming_context.awaitTermination()
                    spark_streaming_context = None
                else:
                    time.sleep(1)
        else:
            self.spark_streaming_context.stop(stopGraceFully=True)
            self.spark_streaming_context.awaitTermination()
    
    
    # and start it  in separate thread
    process_thread = threading.Thread(target=process)
    process_thread.start()
    process_thread.join()
    

    如果您想防止代码崩溃并从最后一个位置重新启动流上下文,请使用checkpointing 机制。 它允许您在失败后恢复您的作业状态。

    【讨论】:

    • 在 scala 中尝试了类似的方法,似乎效果很好
    【解决方案2】:

    重启Spark的最佳方式实际上是根据你的环境。但总是建议使用spark-submit控制台。

    您可以像任何其他linux 进程一样将spark-submit 进程置于后台,方法是将其置于shell 的后台。在您的情况下,spark-submit 作业实际上然后在YARN 上运行驱动程序,因此,它是一个已经通过YARN 在另一台机器上异步运行的进程。

    Cloudera blog

    【讨论】:

      【解决方案3】:

      我们最近探索的一种方法(在此处的 spark 聚会中)是通过使用 Zookeeper in Tandem with Spark 来实现这一点。简而言之,这使用 Apache Curator 来监视 Zookeeper 上的更改(ZK 配置的更改,这可以由您的外部事件触发),然后导致侦听器重新启动。

      引用的代码库是 here ,您会发现配置中的更改会导致 Watcher(一个 Spark 流应用程序)在正常关闭并重新加载更改后重新启动。希望这是一个指针!

      【讨论】:

        【解决方案4】:

        我目前正在解决这个问题,

        • 通过订阅 MQTT 主题收听外部事件

        • 在 MQTT 回调中,停止流上下文 ssc.stop(true,true),这将正常关闭流和底层 火花配置

        • 通过创建 spark conf 和 通过读取配置文件来设置流

        // Contents of startSparkApplication() method
        sparkConf = new SparkConf().setAppName("SparkAppName")
        ssc = new StreamingContext(sparkConf, Seconds(1))
        val myStream = MQTTUtils.createStream(ssc,...)   //provide other options
        myStream.print()
        ssc.start()
        

        应用程序被构建为 Spring boot 应用程序

        【讨论】:

          【解决方案5】:

          在 Scala 中,停止 sparkStreamingContext 可能涉及停止 SparkContext。我发现当一个receiver挂掉时,最好重启SparkCintext和SparkStreamingContext。

          我确信下面的代码可以写得更优雅,但它允许以编程方式重新启动 SparkContext 和 SparkStreamingContext。完成此操作后,您也可以通过编程方式重新启动接收器。

              package coname.utilobjects
          
          import com.typesafe.config.ConfigFactory
          import grizzled.slf4j.Logging
          import coname.conameMLException
          import org.apache.spark.SparkConf
          import org.apache.spark.sql.SparkSession
          import org.apache.spark.streaming.{Seconds, StreamingContext}
          
          import scala.collection.mutable
          
          
          object SparkConfProviderWithStreaming extends Logging
          {
            val sparkVariables: mutable.HashMap[String, Any] = new mutable.HashMap
          }
          
          
          
          trait SparkConfProviderWithStreaming extends Logging{
          
          
          
          
          
          
            private val keySSC = "SSC"
            private val keyConf = "conf"
            private val keySparkSession = "spark"
          
          
            lazy val   packagesversion=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.packagesversion")
            lazy val   sparkcassandraconnectionhost=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkcassandraconnectionhost")
            lazy val   sparkdrivermaxResultSize=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkdrivermaxResultSize")
            lazy val   sparknetworktimeout=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparknetworktimeout")
          
          
            @throws(classOf[conameMLException])
            def intitializeSpark(): Unit =
            {
              getSparkConf()
              getSparkStreamingContext()
              getSparkSession()
            }
          
            @throws(classOf[conameMLException])
            def getSparkConf(): SparkConf = {
              try {
                if (!SparkConfProviderWithStreaming.sparkVariables.get(keyConf).isDefined) {
                  logger.info("\n\nLoading new conf\n\n")
                  val conf = new SparkConf().setMaster("local[4]").setAppName("MLPCURLModelGenerationDataStream")
                  conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
                  conf.set("spark.cassandra.connection.host", sparkcassandraconnectionhost)
                  conf.set("spark.driver.maxResultSize", sparkdrivermaxResultSize)
                  conf.set("spark.network.timeout", sparknetworktimeout)
          
          
                  SparkConfProviderWithStreaming.sparkVariables.put(keyConf, conf)
                  logger.info("Loaded new conf")
                  getSparkConf()
                }
                else {
                  logger.info("Returning initialized conf")
                  SparkConfProviderWithStreaming.sparkVariables.get(keyConf).get.asInstanceOf[SparkConf]
                }
              }
              catch {
                case e: Exception =>
                  logger.error(e.getMessage, e)
                  throw new conameMLException(e.getMessage)
              }
          
            }
          
            @throws(classOf[conameMLException])
          def killSparkStreamingContext
            {
              try
              {
                if(SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined)
                  {
                    SparkConfProviderWithStreaming.sparkVariables -= keySSC
                    SparkConfProviderWithStreaming.sparkVariables -= keyConf
                  }
                SparkSession.clearActiveSession()
                SparkSession.clearDefaultSession()
          
              }
              catch {
                case e: Exception =>
                  logger.error(e.getMessage, e)
                  throw new conameMLException(e.getMessage)
              }
            }
          
            @throws(classOf[conameMLException])
            def getSparkStreamingContext(): StreamingContext = {
              try {
                if (!SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined) {
                  logger.info("\n\nLoading new streaming\n\n")
                  SparkConfProviderWithStreaming.sparkVariables.put(keySSC, new StreamingContext(getSparkConf(), Seconds(6)))
          
                  logger.info("Loaded streaming")
                  getSparkStreamingContext()
                }
                else {
                  SparkConfProviderWithStreaming.sparkVariables.get(keySSC).get.asInstanceOf[StreamingContext]
                }
              }
              catch {
                case e: Exception =>
                  logger.error(e.getMessage, e)
                  throw new conameMLException(e.getMessage)
              }
            }
          
            def getSparkSession():SparkSession=
            {
          
              if(!SparkSession.getActiveSession.isDefined)
              {
                SparkSession.builder.config(getSparkConf()).getOrCreate()
          
              }
              else
                {
                  SparkSession.getActiveSession.get
                }
            }
          
          }
          

          【讨论】:

            猜你喜欢
            • 2020-04-16
            • 1970-01-01
            • 2018-07-08
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2012-04-20
            • 2016-11-12
            • 2011-05-02
            相关资源
            最近更新 更多