【发布时间】:2019-01-10 04:42:45
【问题描述】:
我有一个简单的方法,它使用 akka 流遍历 mongo 集合,对于每个元素,我调用一个方法 enrichDataFromGoogleAndInsert,它调用 google api 来丰富文档数据并将其插入新集合,
所以enrichDataFromGoogleAndInsert 是异步的。
def processVendors()(implicit m: Materializer): Future[Done] = {
val vendorsSource: Source[Vendor, Future[State]] =
collection.find(json())
.noCursorTimeout
.cursor[Vendor]()
.documentSource()
.throttle(50, 1.second)
vendorsSource
.runForeach(vendor =>
enrichDataFromGoogleAndInsert(vendor)
)
}
我从我的控制器运行此方法,我想知道如何累积错误,并确保脚本在enrichDataFromGoogleAndInsert 抛出某种错误时停止。
【问题讨论】:
-
我建议你先看一下akka stream docs,因为没有关于reactivemongo的特定内容
标签: scala akka-stream