【发布时间】:2020-10-01 21:14:57
【问题描述】:
我的流应用程序只是从记录主题中具体化一个 KTable。主题中有 10 万条记录,没有问题。但是,在主题中有 1500 万条记录的情况下,一旦我们获得几百万条记录,实例就会崩溃,并出现如下异常:
线程“Companies-de1f21f9-b445-449e-a59b-5e0cecfa54d1-StreamThread-1” org.apache.kafka.streams.errors.StreamsException 中的异常:任务 [0_0] 中止发送,因为上一条记录捕获了错误(时间戳 1601327726515) 到主题 Companies-companies.read-changelog 由于 org.apache.kafka.common.errors.TimeoutException:Companies-companies.read-changelog-0:120001 的 40 条记录过期自批次创建]
Here is a Gist 提供我们正在运行的服务的详细示例。
让我感到困惑的是,我的流应用程序崩溃(如下)的错误是引用了一个超载的生产者,但是,这个服务只是实现了一个 KTable。
streamsBuilder
.stream(egressTopic, Consumed.with(Serdes.String(), companySerde))
.toTable(Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
.withKeySerde(Serdes.String())
.withValueSerde(companySerde));
我已经调整过的属性,试图让它在名义上运行:
-
batch.size10000 -
linger.ms1000 -
request.timeout.ms300000 -
max.block.ms300000 -
retry.backoff.ms1000 -
replication.factor3
【问题讨论】:
标签: java apache-kafka apache-kafka-streams