【问题标题】:fork and join using Akka使用 Akka 分叉和加入
【发布时间】:2011-12-23 18:32:14
【问题描述】:

问题陈述:我有一个需要以并行方式处理的证券组合。在 Java 中,我使用线程池来处理每个安全性,并使用锁存器进行倒计时。完成后,我会进行一些合并等。

所以我向我的 SecurityProcessor(它是一个演员)发送消息,并等待所有的未来完成。最后,我使用 MergeHelper 进行后处理。 SecurityProcessor 接受一个安全,执行一些 i/o 和处理并回复一个安全

  val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures += (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get

这种设计是否正确,是否有更好/更酷的方法来使用 akka 实现这个常见问题?

【问题讨论】:

    标签: scala parallel-processing future akka actor


    【解决方案1】:
    val futureResult = Future.sequence(
                      portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                    ) map { securities => MergeHelper.merge(portfolio, securities) }
    

    【讨论】:

    • 真的很喜欢这个建议并按预期工作,直到我不得不拆分它并添加一堆 Eventhandler.info 语句来调试问题:(
    • def debug[T](t: T): T = { EventHandler.info(t); t }
    • 很高兴你喜欢它,请在 Akka 邮件列表上分享你喜悦和/或痛苦的泪水!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-03-02
    • 1970-01-01
    • 2011-04-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-23
    相关资源
    最近更新 更多