【问题标题】:Correct usage of scalaz Future for async execution正确使用 scalaz Future 进行异步执行
【发布时间】:2013-10-08 17:36:00
【问题描述】:

我想我不完全了解 scalaz Futures 的工作原理。我正在尝试将一个项目从 scala futures 移植到 scalaz 实现,但问题是 scalaz Future 的性能较低。最简单的示例是使用 Spray 在身份验证请求中加载配置文件。

函数本身:

def loadProfile[A: CollectionProvider: JsonFormat](id: String) = future {
  remote.findOne[A]("id" :> id) match {
    case Some(profile) ⇒ \/-(profile)
    case None          ⇒ -\/(ProfileNotFoundRejection(id))
  }
}

scalaz 版本仅在一个符号上有所不同,我从scalaz.concurrent 调用Future.apply。 现在加载一些 html 页面的 Spray 路由:

get {
  path("profile" / "id" ~ Segment) { id ⇒
    onSuccess(loadProfile[User](id)) {
      case \/-(profile) ⇒ complete(html.page(profile))
      case -\/(pnfr)    ⇒ reject(pnfr)
    }
  }
}

loadProfile 一样,scalaz 版本的不同之处仅在于方法调用:

get {
  path("profile" / "id" ~ Segment) { id ⇒
    ctx => loadProfile[User](id).runAsync {
      case \/-(profile) ⇒ ctx.complete(html.page(profile))
      case -\/(pnfr)    ⇒ ctx.reject(pnfr)
    }
  }
}

但是 scala Future 版本的请求在(大约)143ms 内完成,而 scalaz 版本在 260ms 内完成。所以我不太关心这个特定的请求,而是一般的异步执行和服务的可扩展性,正如我在scalaz Future中理解的那样,我必须手动将执行分叉到一个单独的线程,所以它按顺序执行? scalaz 未来的用法有什么好的介绍/教程吗?

【问题讨论】:

  • 抱歉,按返回太快了。也许它与用于执行计算的底层 ExecutorService 有关? Scalaz 的Future.apply 将 ExecutorService 作为隐式参数。默认情况下,它使用ExecutorService.newFixedThreadPool(...),使用您机器上可用的处理器数量。在 Spray 路由中创建的Futures 默认使用 Akka 调度程序,这是一个 Fork-Join 池。

标签: scala asynchronous future scalaz spray


【解决方案1】:

scala 和 scalaz 期货非常不同:

斯卡拉

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits._

// creating two slow futures:
val f: Future[Unit] = Future { println("f " + Thread.currentThread().getName()); Thread.sleep(10000);  }
val g: Future[Unit] = Future { println("g " + Thread.currentThread().getName()); Thread.sleep(10000);  }

// and after moment asking for success
f onSuccess { case _ => println("f s1 " + Thread.currentThread().getName()) }
g onSuccess { case _ => println("g s1 " + Thread.currentThread().getName()) }
f onSuccess { case _ => println("f s2 " + Thread.currentThread().getName()) }
g onSuccess { case _ => println("g s2 " + Thread.currentThread().getName()) }

我们在创建fg 后立即得到一个输出

f ForkJoinPool-1-worker-5
g ForkJoinPool-1-worker-3

约 10 秒后休息输出

f s1 ForkJoinPool-1-worker-5
g s1 ForkJoinPool-1-worker-5
f s2 ForkJoinPool-1-worker-5
g s2 ForkJoinPool-1-worker-5

斯卡拉兹

import scalaz.concurrent._ // z!
import scala.concurrent.ExecutionContext.Implicits._

// creating two slow futures:
val f: Future[Unit] = Future { println("f " + Thread.currentThread().getName()); Thread.sleep(10000);  }
val g: Future[Unit] = Future { println("g " + Thread.currentThread().getName()); Thread.sleep(10000);  }

创建fg 后,什么也没有发生。我们有:

f: scalaz.concurrent.Future[Unit] = Async(<function1>)
g: scalaz.concurrent.Future[Unit] = Async(<function1>)

但是在运行它们之后我们会看到不同之处:

f runAsync { _ => println("f s1 " + Thread.currentThread().getName()) }
g runAsync { _ => println("g s1 " + Thread.currentThread().getName()) }
f runAsync { _ => println("f s2 " + Thread.currentThread().getName()) }
g runAsync { _ => println("g s2 " + Thread.currentThread().getName()) }

我们得到结果:

f pool-4-thread-2
g pool-4-thread-1
f pool-4-thread-4
g pool-4-thread-3

f s2 pool-4-thread-4
g s2 pool-4-thread-3
g s1 pool-4-thread-1
f s1 pool-4-thread-2

有两点值得一提:

  • Futures fg 再次执行。没有价值记忆。
  • runAsync 回调在与第一次计算相同的线程中执行。这是因为我们没有明确分叉。

很难说为什么它们在您的示例中表现不同。无论如何,大部分时间应该花在remove.findOne。您想在阻塞调用周围使用scala.concurrent.blocking 来帮助ExecutorService 避免陷入线程饥饿(在这两种情况下)。

【讨论】:

    猜你喜欢
    • 2019-05-31
    • 2020-08-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-13
    • 2015-11-08
    • 2017-10-17
    • 2020-04-11
    相关资源
    最近更新 更多