【问题标题】:What is the correct way to start/stop spark streaming jobs in yarn?在纱线中启动/停止火花流作业的正确方法是什么?
【发布时间】:2015-10-19 11:34:15
【问题描述】:

我已经尝试了好几个小时并在谷歌上搜索了很多小时,但都没有运气。

我有一个在本地 spark 集群中运行良好的 spark 流应用程序。现在我需要将它部署在 cloudera 5.4.4 上。我需要能够启动它,让它在后台持续运行,并且能够停止它。

我试过了:

$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs

但它只是无休止地打印这些行。

15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)

问题一:由于是流媒体应用,需要持续运行。那么如何在“后台”模式下运行它呢?我能找到的在纱线上提交火花作业的所有示例似乎都假设应用程序将完成一些工作并终止,因此您希望在前台运行它。但流媒体并非如此。

接下来...此时应用程序似乎无法正常运行。我认为这可能是我的错误或配置错误,因此我尝试查看日志以了解发生了什么:

$ yarn logs -applicationId application_1438092860895_012

但它告诉我:

/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.

那么问题2:如果应用程序正在运行,为什么它没有日志文件?

所以最终我不得不杀死它:

$ yarn application -kill application_1438092860895_012

这带来了第 3 个问题:假设我最终可以启动应用并在后台运行,“yarn application -kill”是停止它的首选方式吗?

【问题讨论】:

    标签: hadoop apache-spark spark-streaming hadoop-yarn cloudera


    【解决方案1】:
    1. 您可以关闭spark-submit 控制台。写出 RUNNING 状态时,该作业已在后台运行。
    2. 日志在应用程序完成后立即可见。在运行时,所有日志都可以在本地工作节点上直接访问(您可以在 YARN 资源管理器 Web UI 中查看),并在作业完成后聚合到 HDFS
    3. yarn application -kill 可能是停止 Spark 流应用程序的最佳方法,但它并不完美。最好做一些优雅的关闭来停止所有流接收器并停止流上下文,但我个人不知道该怎么做。

    【讨论】:

    • 我和 Keven 有同样的问题,但是你的答案 1 对我来说似乎不起作用。我有一个 python 流应用程序。当我将它提交到我的独立 Spark 时,它会打印出信息日志并打印“app-20160403171906-0003/0 is now RUNNING”,但我无法退出提交。
    【解决方案2】:
    1. 您的数据源是什么?如果它是可靠的,就像 Kafka 直接接收器,那么 yarn kill 关闭应该没问题。当您的应用程序重新启动时,它将从最后一个完整的批次偏移中读取。如果数据源不可靠,或者如果您想自己处理正常关闭,则必须在流上下文上实现某种外部挂钩。我遇到了同样的问题,最后我实现了一个小技巧,在 webui 中添加一个新标签作为停止按钮。

    【讨论】:

      【解决方案3】:

      我终于找到了一种安全关闭火花流作业的方法。

      1. 编写一个套接字服务器线程等待停止流上下文
      包 xxx.xxx.xxx 导入 java.io.{BufferedReader, InputStreamReader} 导入 java.net.{ServerSocket, Socket} 导入 org.apache.spark.streaming.StreamingContext 对象 KillServer { 类 NetworkService(端口:Int,ssc:StreamingContext)扩展 Runnable { val serverSocket = new ServerSocket(端口) 定义运行(){ Thread.currentThread().setName("壮地 | 等待优雅停在端口" + 端口) 而(真){ val socket = serverSocket.accept() (new Handler(socket, ssc)).run() } } } 类 Handler(socket: Socket, ssc: StreamingContext) 扩展 Runnable { 定义运行(){ val reader = new InputStreamReader(socket.getInputStream) val br = 新的 BufferedReader(阅读器) if (br.readLine() == "kill") { ssc.stop(真,真) } br.close(); } } def run(port:Int, ssc: StreamingContext): Unit ={ (new NetworkService(port, ssc)).run } }
      1. 在您开始流式传输上下文的 main 方法中,添加以下代码

        ssc.start() KillServer.run(11212, ssc) ssc.awaitTermination()
      2. 编写 spark-submit 将作业提交到 yarn,并直接输出到您稍后将使用的文件

      火花提交--class“com.Mainclass”\ --conf "spark.streaming.stopGracefullyOnShutdown=true" \ --master yarn-cluster --queue "root" \ --部署模式集群\ --executor-cores 4 --num-executors 8 --executor-memory 3G \ hdfs:///xxx.jar > 输出 2>&1 &
      1. 最后,安全关闭 Spark 流式传输作业,不会丢失数据或计算结果不持久!!! (用于优雅停止流式传输上下文的服务器套接字正在驱动程序上运行,因此您 grep 步骤 3 的输出以获取驱动程序地址,并使用 echo nc 发送套接字终止命令)
      #!/bin/bash 驱动程序=`猫输出 | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'` 回声“杀” |数控$驱动程序11212 driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po '应用程序_\d+_\d+'` 纱线应用程序-kill $driverid

      【讨论】:

      • 虽然这可能有效,但我后来了解到“yarn application -kill”会向您的应用程序发送一个信号,您可以处理并优雅地关闭它。例如在 scala 中: sys.ShutdownHookThread { LOGGER.info("Stopping spark context...") ssc.stop(stopSparkContext = true, stopGracefully = true) LOGGER.info("Stopped") }
      • 看起来 sys.ShutdownHookThread 方法在 Spark 1.5 中停止工作。我可以验证它在 Spark 1.6.1 中不起作用。
      【解决方案4】:

      最后一个难题是如何优雅地停止部署在 YARN 上的 Spark Streaming 应用程序。停止(或更确切地说是杀死)YARN 应用程序的标准方法是使用命令yarn application -kill [applicationId]。此命令会停止 Spark Streaming 应用程序,但这可能发生在批处理的中间。因此,如果作业从 Kafka 读取数据,将处理结果保存到 HDFS 并最终提交 Kafka 偏移量,那么当作业在提交偏移量之前停止时,HDFS 上应该会出现重复数据。

      解决正常关闭问题的第一个尝试是在关闭挂钩中调用 Spark 流上下文停止方法。

      sys.addShutdownHook {
          streamingContext.stop(stopSparkContext = true, stopGracefully = true)
      }
      

      令人失望的是,调用关闭钩子太晚而无法完成已启动的批处理,并且 Spark 应用程序几乎立即被终止。此外,根本无法保证 JVM 会调用关闭挂钩。

      在撰写这篇博文时,唯一确认的在 YARN 上正常关闭 Spark Streaming 应用程序的方法是以某种方式通知应用程序计划关闭,然后以编程方式停止流式传输上下文(但不是通过关闭挂钩)。如果通知的应用程序在定义的超时后没有停止,则命令yarn application -kill 应仅用作最后的手段。

      可以使用 HDFS 上的标记文件(最简单的方法)或使用在驱动程序上公开的简单 Socket/HTTP 端点(复杂的方法)通知应用程序计划关闭。

      因为我喜欢 KISS 原理,下面你可以找到使用标记文件启动/停止 Spark Streaming 应用程序的 shell 脚本伪代码:

      start() {
          hdfs dfs -touchz /path/to/marker/my_job_unique_name
          spark-submit ...
      }
      
      stop() {
          hdfs dfs -rm /path/to/marker/my_job_unique_name
          force_kill=true
          application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
          for i in `seq 1 10`; do
              application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
              if [ -n "$application_status" ]; then
                  sleep 60s
              else
                  force_kill=false
                  break
              fi
          done
          $force_kill && yarn application -kill ${application_id}
      }
      

      在 Spark Streaming 应用程序中,后台线程应该监控标记文件,当文件消失时停止上下文调用

      streamingContext.stop(stopSparkContext = true, stopGracefully = true)
      

      也可以参考http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-12-11
        • 2021-12-13
        • 2017-08-27
        • 1970-01-01
        • 2018-10-20
        • 2017-06-11
        • 2020-05-29
        • 1970-01-01
        相关资源
        最近更新 更多