【问题标题】:Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef
【发布时间】:2015-08-27 09:32:44
【问题描述】:

我正在尝试使用Source.actorRef 方法来创建akka.stream.scaladsl.Source 对象。某种形式的东西

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

我的问题是:如何将数据发送到基于 ActorRef 的 Source 对象

我认为向源发送消息是某种形式

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

但是weatherSource 没有! 运算符或tell 方法。

documentation 没有过多描述如何使用 Source.actorRef,它只是说您可以...

提前感谢您的评论和回复。

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    你需要一个Flow:

      import akka.stream.OverflowStrategy.fail
      import akka.stream.scaladsl.Source
      import akka.stream.scaladsl.{Sink, Flow}
    
      case class Weather(zip : String, temp : Double, raining : Boolean)
    
      val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
    
      val sunnySource = weatherSource.filter(!_.raining)
    
      val ref = Flow[Weather]
        .to(Sink.ignore)
        .runWith(sunnySource)
    
      ref ! Weather("02139", 32.0, true)
    

    请记住,这都是实验性的,可能会改变!

    【讨论】:

    • 在 M5 中,看起来 Source.actorRef 不存在。你知道它搬到哪里了吗?
    • 看起来他们基本上将其更改为将Props 传递给源。更新的文档在这里doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/…
    • 1.0-RC3 是最新版本,Source.actorRef 仍然在那里:doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/…
    • Source(Props) 在 M5 中,现在在 RC3 中 Source.actorPublisher 是另外一回事:它用于创建由自定义 ActorPublisher 实现支持的 Source。
    • 嗯,我在尝试获取底层 ActorRef 时遇到了类似的问题,但我需要 ref 才能创建我的 Sink。这个Flow 可以被扔掉并创建另一个Flow 吗?
    【解决方案2】:

    @Noah 指出 akka-streams 的实验性质,他的回答可能不适用于 1.0 版本。我不得不按照this example给出的例子:

    implicit val materializer = ActorMaterializer()
    val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
    actorRef ! TweetInfo(...)
    val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
    

    【讨论】:

      【解决方案3】:

      ActorRef 的实例与所有“物化值”一样,只有在整个流被物化后才能访问,换句话说,当 RunnableGraph 正在运行时。

      // RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
      val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))
      
      // You get ActorRef instance as a materialized value
      val actorRef1: ActorRef = rg1.run()
      
      // Or even more correct way: to materialize both ActorRef and future to completion 
      // of the stream, so that we know when we are done:
      
      // RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
      // (ActorRef, Future[Done]) when you run the graph
      val rg2: RunnableGraph[(ActorRef, Future[Done])] =
        sunnySource.toMat(Sink.foreach(println))(Keep.both)
      
      // You get both ActorRef and Future[Done] instances as materialized values
      val (actorRef2, future) = rg2.run()
      
      actorRef2 ! Weather("90210", 72.0, false)
      actorRef2 ! Weather("02139", 32.0, true)
      actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
      future onComplete { /* ... */ }
      

      【讨论】:

      • 同时完成 ActorRef 和 future - 太棒了!谢谢!
      猜你喜欢
      • 1970-01-01
      • 2017-04-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-17
      • 1970-01-01
      • 2015-10-07
      相关资源
      最近更新 更多