【发布时间】:2015-03-06 19:17:20
【问题描述】:
我有一个依赖 API 响应的流程。当响应不符合我的预期时,会引发异常。此策略非常适用于 Spray 和使用 specs2 的直接方法测试。
但是,当我尝试使用带有异常抛出模块的流时,流只会停止。
这是我的流程:
Source(() => file)
.via(csvToSeq)
.via(getFromElastic)
.via(futureExtrtactor)
.via(findLocaionOfId)
.foreach(v => v.map(v => println("foreached", v)))
.onComplete(_ => system.shutdown())
我对此的策略是使用map 进行期货交易。
像这样:
val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
js.asJsObject("Couldn't convert").getFields("externalId").map({
case JsString(str) => {
(i + 1, i == 0, js)
}
else (i, false, js)
}
case _ => (i, false, x)
})
})
}
}))
这是一个位于完全不同位置的潜在异常引发器:
val encoded_url = URLEncoder.encode(url, "UTF-8")
好像我错过了什么,但看不到什么。感谢您的任何指点。
【问题讨论】:
-
我认为您需要在这里更具体一点。当前正在发生的行为是什么,您更喜欢什么?如果转换步骤失败并且停止了流程的其余部分,那么您更喜欢什么?提供这种细节将有助于人们为您制定更好的答案。
-
在您的
onComplete中,您忽略了使用_提供的Try的值。您应该尝试匹配Success和Failure并查看您得到的结果。如果失败,您应该拥有堆栈。 -
程序永远不会终止,所以 oncomplete 不会触发
标签: scala stream akka akka-stream