【发布时间】:2018-04-04 13:42:59
【问题描述】:
我正在尝试对 Akka 进行一个非常基本的使用,同时对浏览所有大量文档并不感到兴奋。
我有一个正在工作的演员。它是无状态的,但 CPU 密集型和长时间运行。我想要这些东西的小池,这样我就可以同时处理多条消息(我有 8 个内核,所以其中 8 个似乎是一个可能的选择)。
我的问题是,我什至不知道从文档的哪个位置开始查找。我想我会很高兴被“给鱼”(告诉我怎么做),但同时,知道在文档中的哪里看(教我钓鱼)将非常有价值,可能更是如此.
在尝试到目前为止的建议时,我首先使用了路由器,因为它似乎最简单。但是,正如此示例所示,我认为它不能解决我的问题。使用下面的代码,一些消息会进入一个未成为第一个可用actor的邮箱。
package routeex
import java.util.concurrent.ThreadLocalRandom
import akka.actor.{Actor, ActorSystem, Props, Terminated}
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router, SmallestMailboxRoutingLogic}
case class Work(message: String, dur: Int = 0)
object Worker {
private val startTime = System.currentTimeMillis()
private var nextId = 1
def getMyId: Int = {
val id = nextId
nextId += 1
id
}
def timestamp: String = f"${System.currentTimeMillis() - startTime}%6.3f"
}
class Worker extends Actor {
val myId = Worker.getMyId
println(s"making actor, id is $myId")
override def receive: Receive = {
case Work("slow", delay) => {
println(s"${Worker.timestamp} Actor $myId going slow for $delay...")
Thread.sleep(delay)
println(s"${Worker.timestamp} Actor $myId awake again...")
}
case Work("report", _) => println(s"${Worker.timestamp} Actor $myId reporting for duty!")
case m => println(s"${Worker.timestamp} Actor $myId got unexpected message $m")
}
}
class Master extends Actor {
var router = {
val routees = Vector.fill(2) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
// Router(RoundRobinRoutingLogic(), routees)
Router(SmallestMailboxRoutingLogic(), routees)
}
def receive = {
case w: Work =>
router.route(w, sender())
case Terminated(a) =>
router = router.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
router = router.addRoutee(r)
}
}
object TryIt {
def main(args: Array[String]): Unit = {
val system = ActorSystem.create("System")
val m = system.actorOf(Props[Master])
m ! Work("slow", 10000)
m ! Work("slow", 1000)
m ! Work("slow", 1000)
m ! Work("slow", 1000)
m ! Work("report")
m ! Work("report")
m ! Work("report")
m ! Work("report")
Thread.sleep(30000)
system.terminate()
}
}
【问题讨论】:
标签: akka