【问题标题】:Kafka Streams App exits within seconds of startingKafka Streams App 在启动后几秒钟内退出
【发布时间】:2021-09-06 21:22:41
【问题描述】:

我希望我的应用程序继续运行并且根本不会停止,但我的应用程序会在几秒钟后立即退出,而不会给出任何类型的错误或警告。 我一直在试图找出原因,但到目前为止我还没有发现任何有用的东西。

我在构建和运行选项下给出了我的资源配置文件路径。

我正在使用配置工厂从资源 json 中读取文件。

我正在使用带有idea intelliJ 的scala 编程语言。 本地服务器中的 Kafka,sql server 作为源,mongodb 作为目标。 kafka 工具查看主题和数据。

【问题讨论】:

标签: scala apache-kafka apache-kafka-streams exit


【解决方案1】:

根据我的经验,没有登录 Kafka Streams(以及一般的 JVM 应用程序)的突然死亡是由原生 OOM 引起的。

Kafka Streams 默认使用 RocksDB 来存储流状态。默认设置有点占用内存。由于 RocksDB 是原生的,它不会受制于或使用你分配的堆,如果原生库分配内存失败,JVM 会死掉而不会抛出异常。

您可以通过运行以下命令来验证此假设: dmesg |尾 -n 10 或者一些这样的,在失败后立即。您应该会看到一个相当明确的报告,说明由于内存分配失败而牺牲了进程。

您可以通过提供 RocksDBConfigSetter 的实现来流式传输属性来调整 RocksDB 配置设置。减少测试环境中的主题分区数量会很有帮助,因为 RocksDB 表的数量会随着分区数量的增加而增加。

【讨论】:

    【解决方案2】:

    你能指定你使用哪个 kafka-client 吗? https://github.com/zio/zio-kafka 为我开箱即用:

    import zio._
    import zio.console._
    import zio.duration._
    import zio.kafka.consumer._
    import zio.kafka.serde._
    
    object Main extends App {
      def run(args: List[String]) = {
        val settings: ConsumerSettings =
          ConsumerSettings(List("localhost:9092"))
            .withGroupId("group")
            .withCloseTimeout(30.seconds)
    
        val subscription = Subscription.topics("topic")
    
        Consumer.consumeWith(settings, subscription, Serde.string, Serde.string) { case (key, value) =>
          putStrLn(s"Received message ${key}: ${value}")
            .ignore
          // Perform an effect with the received message
        }.exitCode
      }
    
    }
    

    【讨论】:

    • 我正在使用 confluent apache kafka
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-22
    • 2020-05-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-25
    相关资源
    最近更新 更多