【问题标题】:Scala future and its callback works in the same execution contextScala 未来及其回调在相同的执行上下文中工作
【发布时间】:2015-09-30 18:39:02
【问题描述】:

我通过 Akka 演员调用 def activateReward 并且执行 OracleClient.rewardActivate(user) 有时非常慢(数据库不在我的职责范围内,属于另一家公司)。

当数据库运行缓慢时,线程池耗尽,无法有效分配更多线程来运行回调future.onComplete,因为回调和futures 在相同的执行上下文中工作。

请告知如何从分配给期货 OracleClient.rewardActivate(user) 的线程异步执行回调中的代码

class RewardActivatorHelper {

  private implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)
    def execute(runnable: Runnable) {threadPool.submit(runnable)}
    def reportFailure(t: Throwable) {throw t}
  }

  case class FutureResult(spStart:Long, spFinish:Long)

  def activateReward(msg:Msg, time:Long):Unit = {
    msg.users.foreach {
      user =>
        val future:Future[FutureResult] = Future {
          val (spStart, spFinish) = OracleClient.rewardActivate(user)
          FutureResult(spStart, spFinish)
        }

        future.onComplete {
          case Success(futureResult:FutureResult) =>
            futureResult match {
              case res:FutureResult => Logger.writeToLog(Logger.LogLevel.DEBUG,s"started:${res.spStart}finished:${res.spFinish}")
              case _ => Logger.writeToLog(Logger.LogLevel.DEBUG, "some error")
            }

          case Failure(e:Throwable) => Logger.writeToLog(Logger.LogLevel.DEBUG, e.getMessage)    
        }
    }
  }
}

【问题讨论】:

    标签: multithreading scala concurrency callback future


    【解决方案1】:

    您可以通过以下方式为onComplete 回调显式指定执行上下文,而不是隐式指定:

    import java.util.concurrent.Executors
    import scala.concurrent.duration.Duration
    
    object Example extends App {
      import scala.concurrent._
    
      private implicit val ec = new ExecutionContext {
        val threadPool = Executors.newFixedThreadPool(1000)
        def execute(runnable: Runnable) {threadPool.submit(runnable)}
        def reportFailure(t: Throwable) {throw t}
      }
    
      val f = Future {
        println("from future")
      }
    
      f.onComplete { _ =>
        println("I'm done.")
      }(scala.concurrent.ExecutionContext.Implicits.global)
    
      Await.result(f, Duration.Inf)
    }
    

    这当然不能解决数据库跟不上的根本问题,但无论如何都可能很高兴。

    澄清一下:我让onComplete 回调由标准global 执行上下文处理。您可能想创建一个单独的。

    【讨论】:

      猜你喜欢
      • 2016-02-16
      • 2018-02-14
      • 2013-11-16
      • 1970-01-01
      • 1970-01-01
      • 2017-06-24
      • 1970-01-01
      • 2015-08-28
      • 2015-01-06
      相关资源
      最近更新 更多