【问题标题】:Use of Future for concurrent search and result return使用 Future 进行并发搜索和结果返回
【发布时间】:2017-07-29 23:10:27
【问题描述】:

我正在创建一个通过单词列表运行的服务,并为每个单词检查数据库中是否存在相关操作(执行)。

我正在尝试同时使用 Futures 执行此操作,但我不确定我是否使用了最好的方法。

```

class CommandDiscoveryService (commandsDAO: CommandsDAO, text: String) {

  val words = text.split("\\s+")

  var results = new ListBuffer[Option[Execution]]()

  // Temporarily handle with concurrent searchs on the database
  // TODO Load all commands to memory and check the list ?? memcache or some other cache service
  if (words.size <= 6) {
    Logger.debug("Searching for executions with text " + text )
    findExecution()
  }

  def findExecution() = {
    val lb = new ListBuffer[Future[Seq[Execution]]]()
    for (word <- words) {
      lb += commandsDAO.findExecutionByName(word)
    }

    lb.foreach(Await.result(_, 1 seconds))

    import scala.concurrent.ExecutionContext.Implicits.global // FIXME LATER
    val res = lb.map {
      ftr => ftr.map{
        res => {
          if (res.size > 0 ) {
            Logger.debug("RES SIZE:" + res.size)
            res.map{ ex => results += Some(ex) }
          }
        }
      }
    }
  }

  def getExecution(): Option[Execution] = {
    if (results.size > 1 ) {
      Logger.debug("ERROR_TOMANYEXECS: Found more than one execution " + results.head)
      results.foreach{
        execs => Logger.debug("ERROR_TOMANYEXECS: " + execs)
      }

      None
    } else {
      if (results.size == 0 ) {
        Logger.debug("NOTHING FOUND IN RES")
        None
      } else {
        Logger.debug("FOUND RES " + results.head)
        results.head
      }
    }
  }

}

```

当我调用 getExecution 时,我需要已经获得搜索完成的值。我不确定对这个结果变量进行锁定是否会成为 Await on the Future[Seq[Execution]] 的解决方案。已不推荐。

PS:我正在使用 playframework 2.6.x 和 Slick 来运行它。

【问题讨论】:

  • 没有。 Await.result 被阻塞,因此不好
  • 但我需要等待结果。

标签: scala


【解决方案1】:

您的results ListBuffer 仅填充Some[Execution],从不填充None,因此在此处使用Option 毫无意义。我建议使用不可变集合并重新定义您的 findExecution 方法以返回 Future[List[Execution]]

val words = text.split("\\s+").toList

def findExecution: Future[List[Execution]] = {
  val executions = words.map(commandsDAO.findExecutionByName(_)) // List[Future[Seq[Execution]]]
  val filtered = executions.map(_.filter(_.nonEmpty)) // List[Future[Seq[Execution]]
  val flattened = Future.sequence(filtered).map(_.flatten) // Future[List[Execution]]
  flattened
}

findExecution 现在返回所有单词或名称的所有 Executions 中的单个 Future,除了没有任何 Executions 的单词。

当我调用 getExecution 时,我需要已经获得搜索完成的值

更好的方法是让getExecution 也返回一个Future

def getExecution: Future[Option[Exception]] = {
  val executions = findExecution // Future[List[Execution]]
  executions.map { e =>
    if (e.size > 1) {
      // ...
      None
    } else if (e.isEmpty) {
      // ...
      None
    } else { // e.size is one
      // ...
      Some(e.head)
    }
  }
}

上述方法避免了阻塞 Await 调用并适合异步 Play 和 Slick API。

【讨论】:

  • 我必须将 sequence(filtered) 更改为 sequence(filtered.toList) 才能编译,否则似乎可以工作。
  • java.sql.SQLException: Interrupted during connection acquire ... 我收到此错误,我可以假设 Future 没有等待完成吗?
猜你喜欢
  • 2015-06-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-02-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多