【问题标题】:Monitoring a closed graph Akka Stream监控封闭图 Akka Stream
【发布时间】:2016-09-12 07:51:53
【问题描述】:

如果我在 Akka Stream 中创建了RunningGraph,我怎么知道(从外部)

  1. 何时所有节点因完成而被取消?
  2. 所有节点何时因错误而停止?

【问题讨论】:

    标签: scala akka-stream


    【解决方案1】:

    我认为没有办法对任意图执行此操作,但如果您控制了您的图,您只需将监控接收器附加到每个可能失败或完成的节点的输出(这些是具有至少一个输出的节点),例如:

    import akka.actor.Status
    
    // obtain graph parts (this can be done inside the graph building as well)
    val source: Source[Int, NotUsed] = ...
    val flow: Flow[Int, String, NotUsed] = ...
    val sink: Sink[String, NotUsed] = ...
    
    // create monitoring actors
    val aggregate = actorSystem.actorOf(Props[Aggregate])
    val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate)))
    val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate)))
    
    // create the graph
    val graph = GraphDSL.create() { implicit b =>
       import GraphDSL._
    
       val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))),
       val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(())))
    
       val bc1 = b.add(Broadcast[Int](2))
       val bc2 = b.add(Broadcast[String](2))
    
       // main flow
       source ~> bc1 ~> flow ~> bc2 ~> sink
    
       // monitoring branches
       bc1 ~> sourceMonitor
       bc2 ~> flowMonitor
    
       ClosedShape
    }
    
    // run the graph
    RunnableGraph.fromGraph(graph).run()
    
    class Monitor(name: String, aggregate: ActorRef) extends Actor {
      override def receive: Receive = {
        case Status.Success(_) => aggregate ! s"$name completed successfully"
        case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}"
        case _ =>
      }
    }
    
    class Aggregate extends Actor {
      override def receive: Receive = {
        case s: String => println(s)
      }
    }
    

    也可以只创建一个监控参与者并在所有监控接收器中使用它,但在这种情况下,您将无法轻松区分失败的流。

    还有一个关于源和流的watchTermination() 方法,它允许实现一个在此时与流一起终止的未来。我认为GraphDSL 可能很难使用,但使用常规流方法可能看起来像这样:

    import akka.Done
    import akka.actor.Status
    import akka.pattern.pipe
    
    val monitor = actorSystem.actorOf(Props[Monitor])
    source
      .watchTermination()((f, _) => f pipeTo monitor) 
      .via(flow).watchTermination((f, _) => f pipeTo monitor)
      .to(sink)
      .run()
    
    class Monitor extends Actor {
      override def receive: Receive = {
        case Done => println("stream completed")
        case Status.Failure(e) => println(s"stream failed: ${e.getMessage}")
      }
    }
    

    您可以在将未来的值传递给参与者以区分流之前对其进行转换。

    【讨论】:

    • @vladimir-matveev sprintln 中缺失。 "$name -> s"$name
    • 谢谢,已修复!
    • 警告:watchTermination 只会告诉您上游何时完成或下游在图表中的该点取消,而不是所有阶段。
    猜你喜欢
    • 1970-01-01
    • 2017-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多