【问题标题】:Scala - Batched Stream from FuturesScala - 来自期货的批处理流
【发布时间】:2016-08-31 17:08:39
【问题描述】:

我有一个案例类 Thing 的实例,并且我有一堆查询要运行,它们会返回一个 Things 的集合,如下所示:

def queries: Seq[Future[Seq[Thing]]]

我需要从所有期货(如上)中收集所有 Things,并将它们分组为大小相同的 10,000 个集合,以便可以将它们序列化为 10,000 个 Things 的文件。

def serializeThings(Seq[Thing]): Future[Unit]

我希望它以这样一种方式实现,即在序列化之前我不等待所有查询运行。在第一个查询的 future 完成后返回 10,000 个Things,我想开始序列化。

如果我这样做:

Future.sequence(queries)

它将收集所有查询的结果,但我的理解是,在所有查询完成并且所有Things 必须立即放入内存之前,不会调用像map 这样的操作。

使用 Scala 集合和并发库实现批处理流管道的最佳方式是什么?

【问题讨论】:

  • 如果您想以 10,000 个批次处理 N 个期货,this 可能会有所帮助。此外,还有来自 James Ward here 的博格帖子。
  • @insan-e 我不想批量处理期货。我想批处理期货的结果,每个期货返回任意数量的Things

标签: scala collections future


【解决方案1】:

我想我成功了。该解决方案基于我之前的answer。它从Future[List[Thing]] 结果中收集结果,直到达到阈值BatchSize。然后它调用serializeThingsfuture,当它完成时,循环继续其余部分。

object BatchFutures extends App {

  case class Thing(id: Int)

  def getFuture(id: Int): Future[List[Thing]] = {
    Future.successful {
      List.fill(3)(Thing(id))
    }
  }

  def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful {
    //Thread.sleep(2000)
    println("processing: " + things)
  }

  val ids = (1 to 4).toList
  val BatchSize = 5

  val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) {
    case (acc, id) =>
      acc flatMap { processed =>
        getFuture(id) flatMap { res =>
          val all = processed ++ res
          val (batch, rest) = all.splitAt(5)

          if (batch.length == BatchSize) { // if futures filled the batch with needed amount
            serializeThings(batch) map { _ =>
              rest // process the rest
            }
          } else {
            Future.successful(all) //if we need more Things for a batch
          }
        }
      }
  }.flatMap { rest =>
    serializeThings(rest)
  }

  Await.result(future, Duration.Inf)

}

结果打印:

处理:列表(事物(1),事物(1),事物(1),事物(2),事物(2))
处理:列表(事物(2),事物(3),事物(3),事物(3),事物(4))
处理:List(Thing(4), Thing(4))

Things 的数量不能被BatchSize 整除时,我们必须再次调用serializeThings(最后flatMap)。我希望它有帮助! :)

【讨论】:

    【解决方案2】:

    在你做Future.sequence之前,先做你想做的事情,然后使用Future.sequence

    //this can be used for serializing
    def doSomething(): Unit = ???
    
    //do something with the failed future
    def doSomethingElse(): Unit = ???
    
    def doSomething(list: List[_]) = ???
    
    val list: List[Future[_]] = List.fill(10000)(Future(doSomething()))
    
    val newList = 
    list.par.map { f =>
      f.map { result =>
        doSomething()
      }.recover { case throwable =>
        doSomethingElse()
      }
    }
    
    Future.sequence(newList).map ( list => doSomething(list)) //wait till all are complete
    

    您可以使用Future.traverse 而不是newList

    Future.traverse(list)(f => f.map( x => doSomething()).recover {case th =>  doSomethingElse() }).map ( completeListOfValues => doSomething(completeListOfValues))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-30
      • 2020-01-30
      • 2020-08-21
      • 1970-01-01
      • 1970-01-01
      • 2016-01-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多