【问题标题】:Publish-Subscribe for Akka Actor FSM not workingAkka Actor FSM 的发布-订阅不起作用
【发布时间】:2015-10-23 08:45:28
【问题描述】:

我有这个特征的基本特征和子类。每个子类都使用自己的事件类订阅事件流。例如 ActorFSM1 关心 InitEventImpl1 所以它会订阅这个事件。 但是,当我发布这些特定事件时,没有演员会收到它们。

trait InitEvent
case class InitEventImpl1 extends InitEvent
case class InitEventImpl2 extends InitEvent

class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
}
class ActorFSM2 extends ActorFSM[InitEventImpl2] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl2] )
}

当我尝试如下发布它们时,没有人收到消息。我究竟做错了什么?

 val system = ActorSystem("system")
 val actor = system.actorOf(Props(new ActorFSM1()) )
 system.eventStream.publish(InitEventImpl1())

【问题讨论】:

  • 当您发布该事件时,ActorFSM1 实例处于什么状态?该状态是否将该消息类型作为Event 处理?

标签: scala akka actor publish-subscribe fsm


【解决方案1】:

不能保证在发布消息时actor已经初始化,也不能保证在publish之前调用了subscribe。这是一个经典的actor-construction 竞争条件。 ActorRef 的创建是同步的,但实际 Actor 的创建(当订阅发生时)是异步的,并且可能在 ActorRef 的创建之后发生。您可以通过向流中发送一堆消息来测试这一点,以查看 Actor 是否最终会看到其中的一些。

trait InitEvent
case class InitEventImpl1(id: Int) extends InitEvent

class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
  def receive: Receive = {
    case InitEventImpl1(id) => println(id)
  }
}

然后发布一堆消息,看看最终能不能得到一些:

val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
Seq.tabulate(100)(InitEventImpl1(_)).foreach(system.eventStream.publish)

如果Actor 绝对必须接收消息,我建议要么将消息直接发送到ActorRef,要么等待Actor 的消息表明它已准备好接收消息。

另一方面,您在此处使用泛型似乎对您没有任何帮助。看来您想要做的是在subscribe 调用和receive 中使用通用参数。这可以通过一些反射魔法来完成:

import akka.actor.Actor
import scala.reflect._

class Test[E <: InitEvent : ClassTag] extends Actor {
   context.system.eventStream.subscribe(self, classTag[E].runtimeClass)
   def receive: Receive = {
     case message: E => println(message)
   }
}

然后在创建ActorRef时指定消息类型

val myTypeActor = system.actorOf(Props(new Test[InitEventImpl1]))
myTypeActor ! InitEventImpl1 // Will be processed
myTypeActor ! InitEventImpl2 // Will not be processed

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-12-18
    • 2018-12-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多