【问题标题】:Stateful Structured Spark Streaming: Timeout is not getting triggered有状态结构化 Spark Streaming:超时未触发
【发布时间】:2020-07-14 23:07:03
【问题描述】:

我已将超时持续时间设置为“2 分钟”,如下所示:

  def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
                          oldState: GroupState[MyState]): OutputRow = {

    println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
    var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)

    if (oldState.hasTimedOut) {
      println("@@@@@ oldState has timed out @@@@")
      // Logic to Write OutputRow
      OutputRow("some values here...")
    } else {
      for (input <- inputs) {
        state = updateWithEvent(state, input)
        oldState.update(state)
        oldState.setTimeoutDuration("2 minutes")
      }
      OutputRow(null, null, null)
    }

  }

我还在 'mapGroupsWithState' 中指定了 ProcessingTimeTimeout,如下所示...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

但 'hasTimedOut' 从来都不是真的,所以我没有得到任何输出!我做错了什么?

【问题讨论】:

    标签: scala apache-spark databricks spark-structured-streaming


    【解决方案1】:

    它似乎只有在输入数据不断流动的情况下才有效。我已经停止了输入作业,因为我有足够的数据,但似乎只有在连续输入数据的情况下超时才有效。不知道为什么它是这样设计的。使编写单元/集成测试变得更加困难,但我确信它的设计方式是有原因的。谢谢。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-29
      • 1970-01-01
      • 2014-09-11
      • 2023-01-20
      • 1970-01-01
      • 2022-08-12
      • 2017-01-11
      相关资源
      最近更新 更多