【发布时间】:2021-07-21 10:16:03
【问题描述】:
我有一个引发以下异常的 Streams 应用程序:
Exception in thread "<StreamsApp>-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=topic1, partition=0, offset=1
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {...} timestamp 1530812658459) to topic topic2 due to Failed to update metadata after 60000 ms.
You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.
在 Streams 应用中,我有以下 配置:
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 5);
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 300000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 300000);
我检查了 kafka 代理日志(我有一个 kafka 代理),并看到以下与此相关的日志:
INFO [GroupCoordinator 1001]: Member <StreamsApp>-StreamThread-1-consumer-49d0a5b3-be2a-4b5c-a4ab-ced7a2484a02 in group <StreamsApp> has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-03 14:39:23,893] INFO [GroupCoordinator 1001]: Preparing to rebalance group <StreamsApp> with old generation 1 (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)
[2018-07-03 14:39:23,893] INFO [GroupCoordinator 1001]: Group <StreamsApp> with generation 2 is now empty (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)
我在某处读到它与消费者在很长一段时间内没有调用poll() 相关,因此它被消费者协调员踢出,现在新消费者使用心跳作为故障检测协议。我不确定这是否是原因,因为我也在使用 Kafka version 1.1.0 和 streams version 1.1.0。
如何避免这种失败情况?现在,每次发生这种情况时,我都必须重新启动流应用程序。
UPDATE-1:
我正在尝试通过将 main 包含在 try-catch 块中来处理此 StreamsException,但我无法捕获异常。它可能是什么原因,我怎样才能抓住它并退出应用程序?目前,streams-app 处于停止状态,并且在此异常之后不执行任何操作。
UPDATE-2:
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
log.error("EXITTING");
log.error(throwable.getMessage());
streams.close(5 ,TimeUnit.SECONDS);
latch.countDown();
System.exit(-1);
});
现在异常被处理和记录。但是,Streams 应用程序并未退出(它仍在终端中以暂停状态运行)。 Ctrl+C 不会杀死它。我必须通过获取进程的pid 并在其上调用kill 来杀死它。
【问题讨论】:
标签: apache-kafka apache-kafka-streams