【问题标题】:stopping a running akka streams graph willfully故意停止正在运行的 akka 流图
【发布时间】:2016-05-29 20:33:07
【问题描述】:

假设我有以下简单的图表。

class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] {

  val out = Outlet[A]("KafkaSource.out")

  override val shape = SourceShape.of(out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, kI.next)
        }
      })
    }
}

val g = GraphDSL.create(){ implicit b =>
  val source = b.add(new KafkaSource[Message](itr))
  val sink = b.add(Sink.foreach[Message](println))

  source ~> sink
  ClosedShape
}

我们正在运行它

RunnableGraph.fromGraph(g).run()

我希望通知 kafkaSource 停止(或人为完成)而不是推送下一个可用元素,以便下游连接的阶段也停止。

我该如何做到这一点?

场景是,我们在 kafka 中有数百万条消息,我们希望在每天晚上 9 点停止处理消息(例如),并假设我们正在通过干净关闭来停止正在运行的应用程序。

【问题讨论】:

    标签: scala akka typesafe akka-stream reactive-streams


    【解决方案1】:

    虽然可能不再与 phantomastray 相关,但对其他人有帮助:

    KillSwitch 允许从 外部。它由一个可以链接到图表的流元素组成 FlowShape 需要完成控制。 KillSwitch 特征允许 完成或失败的图表。 [来源:Akka Docs]

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多