【问题标题】:Akka ClusterSingletonProxy to a remote deployed singletonAkka ClusterSingletonProxy 到远程部署的单例
【发布时间】:2017-06-27 11:34:24
【问题描述】:

我正在尝试通过另一个参与者向部署在远程节点上的单例参与者发送消息。

这是等待 memberUp 事件的管理器,然后在该节点上部署 Worker 演员,然后向单例发送消息:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      context.system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props(classOf[Worker]),
        singletonName = "worker",
        terminationMessage = End,
        role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))))

      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"/user/singleton/worker",
        role = Some(s"worker")), "worker") ! "hello"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

这是工人:

object Worker extends App{
  ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

还有application.conf:

manager {
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    cluster {
      auto-down-unreachable-after = 20s
      seed-nodes = [
        "akka.tcp://mySys@127.0.0.1:2552"
      ]
      roles.1 = "manager"

    }
    remote.netty.tcp.port = 2552

  }
}

worker {
  akka {
    cluster {
      auto-down-unreachable-after = 20s
      seed-nodes = [
        "akka.tcp://mySys@127.0.0.1:2552"
      ]
      roles.1 = "worker"
    }
    remote.netty.tcp.port = 2554
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
  }
}

worker 已初始化(我可以在日志中看到 state change [Start -> Oldest] 消息),但从 manager 发送的消息永远不会到达 worker。当我在远程节点上部署单例时,它曾经工作正常,但现在我希望管理器部署它。

我也尝试将其部署为管理器的子节点(使用上下文而不是 context.system)并将单例路径更改为 user/manager/singleton/worker,但没有成功。

我正在使用 Akka 2.3.11

编辑: sbt 文件:

name := "MyProject"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies +=
    "com.typesafe.akka" %% "akka-actor" % "2.3.11",
    "com.typesafe.akka" %% "akka-cluster" % "2.3.11",
    "joda-time" % "joda-time" % "2.0",
    "com.typesafe.akka" %% "akka-contrib" % "2.3.11"

【问题讨论】:

  • 你能发布你的 build.sbt 吗?无需一起搜索所有库,更容易重现。
  • 我用 sbt 文件编辑了我的帖子
  • 谢谢,ClusterSingleton 的包在 2.5 中发生了变化,我不知道 2.3.11 是哪个
  • 您是否有任何理由要将集群单例远程部署到工作人员?
  • 不确定是什么问题。如果您问我为什么需要单例而不是普通演员,那是因为在我的实际应用程序中,我有另一个经理,我不想重新启动两个工人(我只想要集群中的一个工人)。如果您问我为什么不从新进程中重新启动actor,那是因为管理器还部署了普通actor,因此最好将逻辑仅放在一个地方-管理器。

标签: scala akka akka-cluster


【解决方案1】:

所以我尝试了创建ClusterSingletonManagers 的不同选项,我认为远程部署它们会破坏单例模式中的某些东西。我为此收集了一些指标:

  • 由于是远程部署,工作节点上ClusterSingletonManager 的路径为/remote/akka.tcp/mySys@127.0.0.1:2552/user/worker。我认为图书馆不能/不会处理这个问题,因为它需要/user/worker

  • 当尝试使用 ClusterSingletonProxy 从主节点发送消息时,登录 DEBUG 模式状态 No singleton available, stashing message hello workerTrying to identify singleton at akka.tcp://mySys@127.0.0.1:2552/user/worker/singleton(失败并重试)-> 它正在寻找单例错误的节点,因为没有可用的管理器,并且显然不知道单例在工作节点上。

直接在工作节点上创建ClusterSingletonManager 时,一切都按预期工作。

您对经理的命名也有问题。你的singletonNameworker 而你的经理本身(演员)没有任何名字。创建代理时,您使用路径/user/singleton/worker,但路径应如下所示:/user/{actorName}/{singletonName}。所以在我的代码中,我使用worker 作为actorName,singleton 作为singletonName

这是我的工作代码:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"/user/worker/singleton",
        role = Some("worker")), name = "workerProxy") ! "hello worker"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

object Worker extends App{
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))

  sys.actorOf(ClusterSingletonManager.props(
    singletonProps = Props(classOf[Worker]),
    singletonName = "singleton",
    terminationMessage = PoisonPill,
    role = Some("worker")), name = "worker")
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

application.conf 和 build.sbt 保持不变。

编辑

通过使用工作节点上的实际路径引用ClusterSingletonProxy(计算它是网络路径)来使用它。我不确定我是否会推荐这个,因为我仍然不确定该库是否被设计为能够做到这一点,但它至少在这个最小的例子中有效:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      val ref = context.system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props(classOf[Worker]),
        singletonName = "singleton",
        terminationMessage = PoisonPill,
        role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))), name = "worker")

      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"${ref.path.toStringWithoutAddress}/singleton", // /remote/akka.tcp/mySys@127.0.0.1:2552/user/worker/singleton
        role = Some("worker")), name = "workerProxy") ! "hello worker"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

object Worker extends App{
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

【讨论】:

  • 谢谢 - 它确实有效。但是,它仅适用于 1 个演员。如果我尝试添加另一个经理和另一个工作人员,其中 manager2 部署了第二个单例 worker2,然后启动代理并发送消息,则消息不会到达 worker1(这是活动的单例)。奇怪的是,他们不想允许远程部署单例,但他们允许在 ClusterSingletonManager 道具上调用 withDeploy。不知道在新版本中是否相同。
猜你喜欢
  • 2019-07-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-02-02
相关资源
最近更新 更多