【问题标题】:complete a future within other future在另一个未来中完成一个未来
【发布时间】:2016-09-20 13:41:18
【问题描述】:

我有一个外部的未来操作,我可以覆盖它的 onOperation 完成方法。我想通过关闭一个Promise来包装它并完成它。但是我无法在其他未来完成那个未来。例如:

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
def foo():Future[String] = {
  val p = Promise[String]()
  channel.addListener(new ChannelFutureListener[IoReadFuture] {
    override def operationComplete(future: IoReadFuture): Unit = {
     p.success("hello")}
  }
  p.future
}

val x = foo()
x: scala.concurrent.Future[String] = List()
x.onComplete{
  case Success(msg)  => println(s"$msg world")
  case Failure(e) => println(e.getMessage)
}
res1: Unit = ()

有没有惯用的方法来做到这一点不阻塞

【问题讨论】:

  • 对不起,我没看懂问题
  • 那是因为你正在导致当前线程休眠。
  • @SarveshKumarSingh 这是一个错误的说法。 sleep 在Future{:=> } 块内。这将在不同的线程上执行
  • 请看我的更新

标签: scala concurrency


【解决方案1】:

我认为您正在尝试创建一个以delay 作为输入并返回delayed future 的函数。

如果您想这样做,可以使用AwaitPromise 以非睡眠方式进行。

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._


// returns a future delayed by `delay` seconds
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = {
  // we will use this promise for wait only
  val waitProxyPromise = Promise[Int]()
  val delayedFuture = Await.ready(waitProxyPromise.future, delay.second).map({
    case _ => value
  })
  delayedFuture
}

val helloFuture = getDelayedFutureOfStringValue(2, "hello")

上面的 amy 看起来是一个不错的实现,但实际上并非如此。 Await 实际上阻塞了线程。可悲的是......在惯用的 Scala 中以完全非阻塞的方式获得延迟的Futue 并不容易。

您可以使用 Java 中的 Timer 实用程序获得一个不错的非阻塞和非睡眠延迟未来,

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import java.util.{Timer, TimerTask}

// delay is number of seconds
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = {
  val promise = Promise[String]()
  val timer = new Timer()
  val timerTask = new TimerTask {
    override def run(): Unit = promise.success(value)
  }
  timer.schedule(timerTask, delay * 1000)
  promise.future
}

val niceHelloFuture = getDelayedFutureOfStringValue(2, "hello")

如果你已经有了一个未来并且你想用它来组成一个依赖的未来,那么这很容易。

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

// I assume you IoReadFuture is either similar to Future or wraps the actual Future   

def foo(value: String):Future[String] = {
  val p = Promise[String]()
  channel.addListener(new ChannelFutureListener[IoReadFuture] {
    override def operationComplete(ioFuture: IoReadFuture): Unit = {
      ioFuture.future.onComplete(_ => p.success(value))
    }
  }
  p.future
}

【讨论】:

    【解决方案2】:

    actor调度器的scheduleOne是非阻塞方式等待一些代码执行

    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext
    import ExecutionContext.Implicits.global
    
    val p = Promise(doSomething())
    val system = akka.actor.ActorSystem("system")
    system.scheduler.scheduleOne(deplay seconds)(p.future)
    val f = p.future
    f.flatMap { println(s"${_}") } 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-07-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多