【问题标题】:scala.concurrent.Future wrapper for java.util.concurrent.Futurejava.util.concurrent.Future 的 scala.concurrent.Future 包装器
【发布时间】:2013-06-17 09:57:35
【问题描述】:

我将 Play Framework 2.1.1 与一个生成 java.util.concurrent.Future 结果的外部 java 库一起使用。我使用的是 scala 未来而不是 Akka,我认为从 Play 2.1 开始这是正确的做法。如何将 java.util.concurrent.Future 包装到 scala.concurrent.Future 中,同时保持代码非阻塞?

def geConnection() : Connection = {
  // blocking with get
  connectionPool.getConnectionAsync().get(30000, TimeUnit.MILLISECONDS)
}

上面的代码返回一个连接,但是使用了一个get,所以它变成了阻塞

def getConnectionFuture() : Future[Connection] = {
  future {
    // how to remove blocking get and return a scala future?
    connectionPool.getConnectionAsync().get(30000, TimeUnit.MILLISECONDS)
  }
}

理想情况下,我想要一个 scala 函数,它可以像上面的代码一样将连接作为未来返回,但不会通过 get 阻塞代码。我还需要在函数中添加什么以使其非阻塞。

任何指针都会很棒。

【问题讨论】:

  • 您使用的是哪个版本的 Scala?从 2.10.x 开始,Scala 采用了 Akka 的 Future 作为自己的。
  • Play 2.1.1 在幕后使用 Scala 2.10.0

标签: java scala concurrency playframework-2.1 future


【解决方案1】:
import java.util.concurrent.{Future => JFuture}
import scala.concurrent.{Future => SFuture}

你不能在不阻塞的情况下用SFuture 包装JFuture,因为SFuture (onComplete) 中有一个回调,而JFuture 中只有阻塞get

你所能做的就是创建额外的线程并用get阻止它,然后用get的结果完成Promise

val jfuture: JFuture[T] = ???
val promise = Promise[T]()
new Thread(new Runnable { def run() { promise.complete(Try{ jfuture.get }) }}).start
val future = promise.future

你可以在无限循环中检查isDone,但我认为它并不比阻塞更好。

【讨论】:

  • 同意。 Java 未来不支持回调的某种完成侦听器/观察器,这真是太遗憾了
  • @cmbaxter 如果你使用 Guava,有ListenableFuture
  • 所以我在考虑 java peice 的回调,当完成后,它的结果会在 scala 未来出现。环顾四周,有一些用 Java (technology.amis.nl/2009/02/19/…) 实现的回调示例,但不知道如何将其引入 Play 2.1。乐观地说,我希望有一个简单的包装器,但它看起来并不可行,而且 scala 函数中的 java 回调看起来是要走的路。
  • @PetrPudlák:它会在你的线程拉取中阻塞线程。如果您为此类期货配置了ExecutionContext,那很好,但默认ExecutionContext 包含as many 线程,因为您有处理器。
  • @senia,我开了一个question,为什么future { jfuture.get }不够用。
【解决方案2】:
Future {
  blocking {
    jfuture.get
  }
}

这让 ExecutionContext 知道你正在做的事情将被阻塞,让它有机会分配更多的线程。如果你不包含blocking { },那么你可能会用完线程。

【讨论】:

  • 您可能想提供有关此优点/缺点的详细信息。是否与上面讨论的完全相同(在 cmets 中,例如来自 @senia)
  • 很抱歉。添加注释来解释“阻塞”的使用。
  • 如果你做“阻塞”,那么你最终可能会有数万个线程处于高负载状态。小心——“阻止”不是魔法!
【解决方案3】:

scala-java8-compat 库提供了 java8 和 Scala Futures 之间的转换器。

具体可以使用FutureConverters.toScala(connectionPool.getConnectionAsync())java.util.concurrent.Future转换为scala.concurrent.Future

【讨论】:

  • 这个答案具有误导性。 scala-java8-compat 允许从java.util.concurrent.CompletionStage 转换为scala.concurrent.Future。没有“正确”的方法可以将 Java Future 转换为 Scala Future。
【解决方案4】:
     import java.util.concurrent.{Future => JFuture}
     import scala.concurrent.ExecutionContext.Implicits.global
     import scala.concurrent.Future
     import scala.util.Try

     object JFuture2SFuture {
        val jFuture: JFuture[Int] = ???
        val promise = Promise[Int]()
        Future { promise.complete(Try(jFuture.get)) } //it is non blocking 
        val sFuture:Future[Int] = promise.future

     }

【讨论】:

  • 评论说“它是非阻塞的”,但这看起来会阻塞执行线程。 Try.apply 是按名称调用,但这并不意味着它会异步执行。 Promise.complete 同样是阻塞的。这确实包装了 java 未来的.get。在 async scala Future 中,但它仍然会阻塞正在执行的线程,类似于 Future { Thread.sleep(10000); }
  • 是的,它会阻塞执行线程而不是调用线程。那是不阻塞调用线程的意图。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-01-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-09
相关资源
最近更新 更多