【问题标题】:ActorSystem shutdown in akka streamAkka 流中的 ActorSystem 关闭
【发布时间】:2018-12-11 08:48:16
【问题描述】:

我有一个 akka 流,它不断地消耗来自 kafka 主题的数据。 我从不关闭actorsystem,我不希望我的应用程序关闭是正确的方法吗?处理actorySystem关闭的正确方法是什么?

  implicit val actorSystem: ActorSystem = ActorSystem("mytest")
  implicit val materializer: ActorMaterializer =
    ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

  val actorConfig = actorSystem.settings.config.getConfig("akka.kafka.consumer")

  val consumerSettings =
    ConsumerSettings(actorConfig, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(config.getString("kafka.hosts"))
      .withGroupId("mytestgrp")


  val flow = Consumer
    .atMostOnceSource(consumerSettings, Subscriptions.topics(config.getString("kafka.topic")))
    .grouped(500)
    .map(Pipeline.process)
    .withAttributes(supervisionStrategy(decider))

  flow.runWith(Sink.ignore)

【问题讨论】:

    标签: apache-kafka akka-stream


    【解决方案1】:

    当流完成时,您可以关闭演员系统

    flow.runWith(Sink.ignore).onComplete {
        case _ => actorSystem.shutdown
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-02-21
      • 2021-01-27
      • 2012-03-08
      • 2017-03-16
      • 1970-01-01
      • 1970-01-01
      • 2021-07-22
      相关资源
      最近更新 更多