【问题标题】:handle errors while iterating over a mongo collection as stream在将 mongo 集合作为流迭代时处理错误
【发布时间】:2019-01-10 04:42:45
【问题描述】:

我有一个简单的方法,它使用 akka 流遍历 mongo 集合,对于每个元素,我调用一个方法 enrichDataFromGoogleAndInsert,它调用 google api 来丰富文档数据并将其插入新集合, 所以enrichDataFromGoogleAndInsert 是异步的。

   def processVendors()(implicit m: Materializer): Future[Done] = {
    val vendorsSource: Source[Vendor, Future[State]] =
      collection.find(json())
      .noCursorTimeout
      .cursor[Vendor]()
      .documentSource()
      .throttle(50, 1.second)

    vendorsSource
    .runForeach(vendor => 
      enrichDataFromGoogleAndInsert(vendor)
    )
  }

我从我的控制器运行此方法,我想知道如何累积错误,并确保脚本在enrichDataFromGoogleAndInsert 抛出某种错误时停止。

【问题讨论】:

  • 我建议你先看一下akka stream docs,因为没有关于reactivemongo的特定内容

标签: scala akka-stream


【解决方案1】:
def processVendors()(implicit m: Materializer): Future[Done] = {
val vendorsSource: Source[Vendor, Future[State]] =
  collection.find(json())
    .noCursorTimeout
    .cursor[Vendor]()
    .documentSource()


vendorsSource
  .mapAsync(50)(vendor =>
    enrichDataFromGoogleAndInsert(vendor)
  )
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runWith(Sink.ignore)
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-23
    • 2018-09-15
    • 1970-01-01
    • 2014-08-06
    • 1970-01-01
    • 1970-01-01
    • 2020-10-03
    相关资源
    最近更新 更多