【问题标题】:Akka, single mailbox multiple (identical) actors?Akka,单个邮箱多个(相同的)演员?
【发布时间】:2018-04-04 13:42:59
【问题描述】:

我正在尝试对 Akka 进行一个非常基本的使用,同时对浏览所有大量文档并不感到兴奋。

我有一个正在工作的演员。它是无状态的,但 CPU 密集型和长时间运行。我想要这些东西的小池,这样我就可以同时处理多条消息(我有 8 个内核,所以其中 8 个似乎是一个可能的选择)。

我的问题是,我什至不知道从文档的哪个位置开始查找。我想我会很高兴被“给鱼”(告诉我怎么做),但同时,知道在文档中的哪里看(教我钓鱼)将非常有价值,可能更是如此.


在尝试到目前为止的建议时,我首先使用了路由器,因为它似乎最简单。但是,正如此示例所示,我认为它不能解决我的问题。使用下面的代码,一些消息会进入一个未成为第一个可用actor的邮箱。

package routeex

import java.util.concurrent.ThreadLocalRandom

import akka.actor.{Actor, ActorSystem, Props, Terminated}
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router, SmallestMailboxRoutingLogic}

case class Work(message: String, dur: Int = 0)

object Worker {
  private val startTime = System.currentTimeMillis()
  private var nextId = 1
  def getMyId: Int = {
    val id = nextId
    nextId += 1
    id
  }
  def timestamp: String = f"${System.currentTimeMillis() - startTime}%6.3f"
}

class Worker extends Actor {
  val myId = Worker.getMyId
  println(s"making actor, id is $myId")

  override def receive: Receive = {
    case Work("slow", delay) => {
      println(s"${Worker.timestamp} Actor $myId going slow for $delay...")
      Thread.sleep(delay)
      println(s"${Worker.timestamp} Actor $myId awake again...")
    }
    case Work("report", _) => println(s"${Worker.timestamp} Actor $myId reporting for duty!")
    case m => println(s"${Worker.timestamp} Actor $myId got unexpected message $m")
  }
}

class Master extends Actor {
  var router = {
    val routees = Vector.fill(2) {
      val r = context.actorOf(Props[Worker])
      context watch r
      ActorRefRoutee(r)
    }
//    Router(RoundRobinRoutingLogic(), routees)
    Router(SmallestMailboxRoutingLogic(), routees)
  }

  def receive = {
    case w: Work =>
      router.route(w, sender())
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val r = context.actorOf(Props[Worker])
      context watch r
      router = router.addRoutee(r)
  }
}

object TryIt {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem.create("System")
    val m = system.actorOf(Props[Master])

    m ! Work("slow", 10000)
    m ! Work("slow", 1000)
    m ! Work("slow", 1000)
    m ! Work("slow", 1000)
    m ! Work("report")
    m ! Work("report")
    m ! Work("report")
    m ! Work("report")

    Thread.sleep(30000)
    system.terminate()
  }
}

【问题讨论】:

    标签: akka


    【解决方案1】:

    【讨论】:

    • 这看起来可行。我可以确认一下吗:我希望“第一个可用”演员拉下一条消息。我在您喜欢的便笺中看到的最接近的行为看起来像是基于“最小邮箱”的路由,但仅仅因为邮箱是空的与空闲并因此准备好工作的参与者无关。我/我会得到那种行为吗?我通常会使用 ExecutorService(如在 Java 中)来执行此操作,但我正在尝试更改范例;)
    • 使用 SmallestMailboxPool,它将按以下顺序选择一个路由:1)选择任何空邮箱的空闲路由(不处理消息) 2)选择任何空邮箱的路由 3)选择具有最少待处理消息的路由在邮箱中。所以它会选择一个可用的演员,如果有的话,只有当没有时才会选择邮箱最小的那个。
    • 我添加了一个代码示例,我相信它说明了路由器(虽然简单,而且“不错”)并没有给我想要的行为。似乎有些消息被放入了尚不可用的actor的邮箱中,并且没有首先可用。
    • 没错,在你发送所有消息的那一刻,routee 都是一样的,每个都得到一半的消息。如果任务逐步到达,它可能会起作用,但如果它们同时到达,则效率不高。那么我想最好的办法是让主管演员分配工作,并让工人在准备好时通知他们。
    【解决方案2】:

    我希望“第一个可用的”演员拉下一条消息。

    根据您上面的评论,路由器不是您想要的。相反,请考虑使用“工作拉动”模式。这种模式的要点是:

    • 主演员协调多个工作演员之间的工作单元。
    • worker 将自己注册到 master,这意味着可以动态添加或删除 worker。
    • 当 master 收到要完成的工作时,master 通知 worker 工作可用。工人准备好后拉出工作单元,用各自的工作单元做需要做的事情,然后在完成后向主人提出更多的工作。

    Akka documentation 中列出的以下链接描述了这种模式:

    您可以根据自己的需要调整Pollmeier's implementation

    【讨论】:

    • 有趣的东西。 Pollmeier 说:“这篇文章距今已有两年了——请将其视为了解分布式工作模式的不错的阅读材料。[...]”因为这看起来在单个队列中非常简单(邮箱) 多角色类型场景,可能很容易(如果与 Akka 本身正交)使用简单的工作池实现,我应该去做我自己的事情吗?它也可以使用 Promise/Future 方法相当干净地完成(尽管这两个提议最终都会创建更多线程)。
    猜你喜欢
    • 2019-11-24
    • 1970-01-01
    • 2018-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-22
    • 1970-01-01
    • 2011-03-10
    相关资源
    最近更新 更多