【问题标题】:Kafka StreamsException TimeoutException: Expiring N record(s) materializing KTableKafka StreamsException TimeoutException:即将到期的 N 条记录实现 KTable
【发布时间】:2020-10-01 21:14:57
【问题描述】:

我的流应用程序只是从记录主题中具体化一个 KTable。主题中有 10 万条记录,没有问题。但是,在主题中有 1500 万条记录的情况下,一旦我们获得几百万条记录,实例就会崩溃,并出现如下异常:

线程“Companies-de1f21f9-b445-449e-a59b-5e0cecfa54d1-StreamThread-1” org.apache.kafka.streams.errors.StreamsException 中的异常:任务 [0_0] 中止发送,因为上一条记录捕获了错误(时间戳 1601327726515) 到主题 Companies-companies.read-changelog 由于 org.apache.kafka.common.errors.TimeoutException:Companies-companies.read-changelog-0:120001 的 40 条记录过期自批次创建]

Here is a Gist 提供我们正在运行的服务的详细示例。

让我感到困惑的是,我的流应用程序崩溃(如下)的错误是引用了一个超载的生产者,但是,这个服务只是实现了一个 KTable。

streamsBuilder
  .stream(egressTopic, Consumed.with(Serdes.String(), companySerde))
  .toTable(Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
    .withKeySerde(Serdes.String())
    .withValueSerde(companySerde));

我已经调整过的属性,试图让它在名义上运行:

  • batch.size10000
  • linger.ms1000
  • request.timeout.ms300000
  • max.block.ms300000
  • retry.backoff.ms1000
  • replication.factor3

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    每个表都由一个更改日志主题支持,以实现容错。因此,每次写入KTable 也是写入相应的更改日志主题。

    如果您输入的主题配置了日志压缩,您可以将您的程序重写为

    streamsBuilder.table(
        egressTopic,
        Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
            .withKeySerde(Serdes.String())
            .withValueSerde(companySerde)
    );
    

    此外,您启用topology.optimization="all":在这种情况下,输入主题将被重新使用更改日志来恢复状态,并且不会创建额外的更改日志主题。

    【讨论】:

    • 嗨,马蒂亚斯,谢谢! KTable 没有进入我们的最终实现,但我会留意确认这一点的机会。
    猜你喜欢
    • 2020-04-24
    • 2016-11-10
    • 1970-01-01
    • 2017-10-31
    • 1970-01-01
    • 2017-08-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多