【问题标题】:how to use ActorSink.actorRefWithBackpressure in akka stream using akka typed如何在使用 akka 类型的 akka 流中使用 ActorSink.actorRefWithBackpressure
【发布时间】:2020-06-18 12:47:35
【问题描述】:

我正在尝试使用 akka typed 来学习 akka 流,当涉及到 akka typed 时,文档有点抽象

Sink.actorRefWithBackpressure 示例非常简单易懂,其中 ActorSink.actorRefWithBackpressure 例子是抽象的

在第一个示例中,我们有 AckingReceiver 演员,它完成了所需的工作,但在第二个示例中

没有像AckingReceiver 中那样实现案例类

val actor: ActorRef[Protocol] = targetActor()

我在某些地方看到过这段代码,但我也无法理解它

def targetActor(): ActorRef[Protocol] = ???

我们如何提供处理案例类的目标参与者的实现 任何帮助将不胜感激

【问题讨论】:

    标签: scala akka akka-stream akka-typed


    【解决方案1】:

    ActorRef[Protocol] 和其他任何类型的演员一样都是类型化演员。在 typed 中获取 ActorSystem 之外的 ActorRef 比在经典中更复杂,这可能是文档省略的原因(因为它对于解释如何使用 ActorSink.actorRefWithBackpressure 并不重要)。

    通常,您会设置一个键入的 ActorSystem 并要求 ActorSystem 输入一个 ActorRef

    import akka.actor.typed.ActorRef
    import akka.actor.typed.scaladsl._
    
    object MainSystem {
      sealed trait Command
      case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])
    
      sealed trait Reply
      case class ProtocolActorIs(actor: ActorRef[Protocol])
    
      def apply(): Behavior[Command] =
        Behaviors.receive { (context, msg) =>
          case ObtainProtocolActor(replyTo) =>
            val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
              // Define the protocol actor
              Behaviors.receive[Protocol] { (context, msg) =>
                case Init(ackTo) =>
                  println(s"Actor ${context.self.path} initializing")
                  ackTo ! Ack
                  Behaviors.same
                case Message(ackTo, msg) =>
                  println(s"Actor ${context.self.path} received $msg")
                  ackTo ! Ack
                  Behaviors.same
                case Complete =>
                  context.stop()  // Delayed until the message is processed
                  ackTo ! Ack
                  Behaviors.same
                case Fail(ex) =>
                  println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
                  Behaviors.same
              })
            context.watch(protocolActor)
            replyTo ! ProtocolActorIs(protocolActor)
        }.receiveSignal {
          case (context, Terminated(ref)) =>
            println(s"Actor ${ref.path} terminated")
        }
    }
    
    val actorSystem = ActorSystem(MainSystem(), "main")
    
    def targetActor(): ActorRef[Protocol] = Await.result(
      actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
      15.second
    )
    

    这可能显示了经典和打字之间最大的两个实际但可能不明显的区别:

    • 输入的ActorSystem 是一个演员(在这个例子中,ActorRef[Protocol] 实际上可以是ActorSystem,尽管你不太可能真的想要这样做)
    • 要价模式发生了相当大的变化

    【讨论】:

      猜你喜欢
      • 2020-05-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-06
      • 2017-10-23
      • 2018-02-06
      • 1970-01-01
      相关资源
      最近更新 更多