【问题标题】:Why do exceptions not stop Akka Stream flow为什么异常不会停止 Akka Stream 流
【发布时间】:2015-03-06 19:17:20
【问题描述】:

我有一个依赖 API 响应的流程。当响应不符合我的预期时,会引发异常。此策略非常适用于 Spray 和使用 specs2 的直接方法测试。

但是,当我尝试使用带有异常抛出模块的流时,流只会停止。

这是我的流程:

 Source(() => file)
      .via(csvToSeq)
      .via(getFromElastic)
      .via(futureExtrtactor)
      .via(findLocaionOfId)
      .foreach(v => v.map(v => println("foreached", v)))
      .onComplete(_ => system.shutdown())

我对此的策略是使用map 进行期货交易。

像这样:

 val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
      jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
        js.asJsObject("Couldn't convert").getFields("externalId").map({
          case JsString(str) => {
              (i + 1, i == 0, js)
            }
            else (i, false, js)
          }
          case _ => (i, false, x)
        })
      })
      }
    }))

这是一个位于完全不同位置的潜在异常引发器:

val encoded_url = URLEncoder.encode(url, "UTF-8")

好像我错过了什么,但看不到什么。感谢您的任何指点。

【问题讨论】:

  • 我认为您需要在这里更具体一点。当前正在发生的行为是什么,您更喜欢什么?如果转换步骤失败并且停止了流程的其余部分,那么您更喜欢什么?提供这种细节将有助于人们为您制定更好的答案。
  • 在您的onComplete 中,您忽略了使用_ 提供的Try 的值。您应该尝试匹配 SuccessFailure 并查看您得到的结果。如果失败,您应该拥有堆栈。
  • 程序永远不会终止,所以 oncomplete 不会触发

标签: scala stream akka akka-stream


【解决方案1】:

这听起来像是在Supervision for Akka Streams 实施后将得到解决的问题。 Akka Streams 仍处于“预实验阶段”,因此该功能尚未实现,但肯定会很快包含在内。

// 在撰写此评论时,当前版本为 1.0-M2(预览里程碑)。

【讨论】:

    【解决方案2】:

    我缺少的是mapAsync

    把上面的函数改成:

    val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].mapAsync({...})
    

    这样,future 被解包,异常会按预期停止程序。

    【讨论】:

    • 是的;当我读到你的问题时,我很惊讶地看到 Flow[Future];这似乎是多余的。 (尽管有一些特定的原因可以保证将 Future 向下传递给 Flow)。
    • 这实际上是一个非常好的观点蒂姆。我不清楚我如何打开未来,就是这样。也许我应该发布一个新的问题/答案集
    猜你喜欢
    • 1970-01-01
    • 2011-04-21
    • 2011-05-09
    • 2016-06-08
    • 1970-01-01
    • 2017-04-29
    • 1970-01-01
    • 2011-09-04
    • 1970-01-01
    相关资源
    最近更新 更多