【问题标题】:Is there a way to dynamically stop Spark Structured Streaming?有没有办法动态停止 Spark Structured Streaming?
【发布时间】:2019-03-01 02:29:40
【问题描述】:

在我的场景中,我有几个数据集不时出现,我需要在我们的平台中摄取。摄取过程涉及几个转换步骤。其中之一是 Spark。特别是到目前为止,我使用的是 spark 结构化流。基础设施还涉及 kafka,spark 结构化流从中读取数据。

我想知道是否有一种方法可以检测到一段时间内没有其他内容可以从某个主题中消耗以决定停止工作。那就是我想在消耗该特定数据集然后停止它所需的时间内运行它。出于特定原因,我们决定不使用 spark 的批处理版本。

因此是否有任何超时或可以用来检测没有更多数据传入并且所有内容都已处理的东西。

谢谢

【问题讨论】:

  • Triger.Once的问题在于,它会在处理之前尝试一次加载集群中的所有数据,基本上相当于使用spark批处理模式。我们希望在处理小批量数据后立即提供结果
  • 我不确定 kafkaConsumer.pollTimeoutMs 到底做了什么?
  • 为什么要停止工作?是否要停止集群以节省资金?
  • (1) 是的钱,(2) 统计数据(管理层希望统计每个数据集需要多长时间才能被完全摄取,确定管道的每个步骤需要多长时间),(3) 调度问题:我们的管道是 3/4 蒸,1/4 批次。在启动关闭管道的后期批处理之前,我们要确保所有流式处理部分都结束了。我们可以将后期部分变成流媒体,但这需要做很多工作,我们现在不想做
  • @MaatDeamon 在这种情况下你最终做了什么?

标签: apache-spark apache-kafka spark-streaming spark-structured-streaming


【解决方案1】:

Structured Streaming Monitoring Options

您可以使用 query.lastProgress 来获取时间戳并围绕它构建逻辑。不要忘记将您的检查点保存到持久、持久、可用的存储中。

【讨论】:

  • 感谢您的输入,只是为了记录,方法query.lastProgress,在多线程上下文中被调用不是吗?这意味着如果流式查询正在运行,则可能会有 AwaitTermination,这意味着除非查询终止,否则不会调用之后的所有其他内容。因此,我一直想知道如何调用此方法。只是想确认一下。
  • 不确定如何使用 query.lastProgress 和 query.awaitTermination 您可能需要考虑异步 StreamingQueryListener 对象。
  • 从 Spark the Definitive Guide 查看这段代码的结尾。它展示了如何将流的状态写入 Kafka。凉爽的! code
  • 是哪一章?
  • 第 23 章 - 生产中的结构流式处理
【解决方案2】:

总结几点建议:

  1. 正如@Michael West 指出的,有监听器来跟踪进度
  2. 据我所知,结构化流式传输doesn't yet support graceful shutdown

因此,一种选择是定期检查查询活动,动态根据可配置的状态关闭(当您确定不能/不应该取得进一步进展时):

// where you configure your spark job...
spark.streams.addListener(shutdownListener(spark))

// your job code starts here by calling "start()" on the stream...

// periodically await termination, checking for your shutdown state
while(!spark.sparkContext.isStopped) {
  if (shutdown) {
    println(s"Shutting down since first batch has completed...")
    spark.streams.active.foreach(_.stop())
    spark.stop()
  } else {
    // wait 10 seconds before checking again if work is complete
    spark.streams.awaitAnyTermination(10000)
  }
}

您的监听器可以通过多种方式动态关闭。例如,如果您只等待单个批次,则只需在第一次更新后关闭:

var shutdown = false
def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
  override def onQueryStarted(_: QueryStartedEvent): Unit = println("Query started: " + queryStarted.id)
  override def onQueryTerminated(_: QueryTerminatedEvent): Unit = println("Query terminated! " + queryTerminated.id)
  override def onQueryProgress(_: QueryProgressEvent): Unit = shutdown = true
}

或者,如果您需要在更复杂的状态更改后关闭,您可以解析 queryProgress.progress 的 json 正文以确定是否在给定的 onQueryUpdate 事件触发时关闭。

【讨论】:

  • 您能详细说明一下吗?例如,如果我只想在 3 分钟触发器上运行我的结构化流式传输作业总共 15 分钟以处理新数据,然后在每天 n # 分钟后动态关闭。这可能吗?
  • 上面我有一个简单的布尔状态,仅用于示例目的,var shutdown,但这可能是更复杂的逻辑。例如,您可以添加另一个状态:var startTime = System.currentTimeMillis()。然后,由于您只需要 15 分钟的绝对时间限制,您甚至可能不需要侦听器检查查询何时完成。更简单的方法是使用上面的while 语句检查SystemstartTime 以来的时间是否超过15 分钟。 在这种特定情况下,当您需要优雅仅在查询完成后关闭以便不会丢失数据时,侦听器很有价值。
【解决方案3】:

你大概可以用这个:-

def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
    while (query.isActive) {
      try{
        if(query.lastProgress.numInputRows < 10){
          query.awaitTermination(1000)
        }
      }
      catch
      {
        case e:NullPointerException => println("First Batch")
      }
      Thread.sleep(500)
    }
  }

您可以创建一个 numInputRows 变量。

【讨论】:

  • 能否请您也添加一个python版本?
猜你喜欢
  • 2021-09-11
  • 2020-03-19
  • 2018-09-14
  • 1970-01-01
  • 2023-01-20
  • 2021-02-12
  • 2019-08-04
  • 2019-10-21
  • 2020-09-12
相关资源
最近更新 更多