【发布时间】:2019-06-01 13:45:24
【问题描述】:
我在 mongo 中有一个人的集合,我想以流的形式遍历集合中的每个人,并为每个人调用一个执行 api 调用、更改模型和插入新集合的方法在 mongo 中。
看起来像这样:
def processPeople()(implicit m: Materializer): Future[Unit] = {
val peopleSource: Source[Person, Future[State]] = collection.find(json()).cursor[Person]().documentSource()
peopleSource.runWith(Sink.seq[Person]).map(people => {
people.foreach(person => {
changeModelAndInsertToNewCollection(person)
})
})
}
但这不起作用...更改模型的部分似乎起作用了,但是插入 mongo 不起作用。
该方法似乎也没有立即开始,在它开始之前有一些处理落后了一分钟......你看到问题了吗?
【问题讨论】:
-
使用
Sink.seq[Person]使其首先从集合中读取所有数据,然后才开始对获取的数据调用people.foreach。如果源集合中有大量数据,这将解释延迟。插入的问题一定出在changeModelAndInsertToNewCollection的实现上,你的例子中没有插入代码 -
@yahor 我的问题不是使用 runWith 我应该使用 runForeach ..
-
这只是一个提示。使用
runForeach不会产生任何背压,因此如果changeModelAndInsertToNewCollection是异步的,您很可能会将所有内容加载到内存中并在短时间内提交大量异步任务。可以使用mapAsync来控制背压 -
@yahor 谢谢哥们。我所做的是在声明我的 peopleSource 时添加了
.documentSource().throttle(50, 1.second),这给了我背压..
标签: mongodb scala akka akka-stream reactivemongo