【发布时间】:2019-02-04 04:17:19
【问题描述】:
最近我试图确保我的 Kafka Streams(2.0.0 版本)应用程序在所有代理的未指定停机时间(停机时间更像几小时而不是几秒钟)。
它没有提供默认配置(retries on 0),因为在所有代理都被杀死后,我的所有流都将状态更改为ERROR 甚至DEAD 并停止工作:
INFO o.a.k.s.p.internals.StreamThread - stream-thread [MyStream-0936f6a6-c9f4-4591-9b25-534abc65b8d1-StreamThread-24] State transition from RUNNING to PENDING_SHUTDOWN
然后:
INFO o.a.k.s.p.internals.StreamThread - stream-thread [MyStream-0936f6a6-c9f4-4591-9b25-534abc65b8d1-StreamThread-24] State transition from PENDING_SHUTDOWN to DEAD
并且一些流很快就出现了错误:
stream-client [MyStream-88a8fe9a-d565-43e3-acb5-20cccc6b4a88] State transition from RUNNING to ERROR
因此,考虑到状态转换 (https://static.javadoc.io/org.apache.kafka/kafka-streams/2.0.0/org/apache/kafka/streams/KafkaStreams.State.html),除了重新启动应用程序之外,我无法以任何其他方式恢复。
我发现的是这个话题: Kafka Streams stops listening from topic and processing messages when broker goes down
它回答了我的问题,建议增加我的流retries 以及retry.backoff.ms 配置。这就是我所做的(将retries 增加到Int.MaxNumber 和retry.backoff.ms 到1000)并发现这种方法存在一些性能问题,并且我碰巧在日志中遇到了反复出现的错误:FETCH_SESSION_ID_NOT_FOUND 我确实找不到有关的信息。
除了增加重试次数之外,还有其他方法可以实现代理的弹性吗?我可以接受 Kafka Brokers 失败后丢失的一些消息,如果无法生成/使用消息,我真的不需要重试。我正在考虑在代理失败后手动重新启动流,但我不确定如何捕获“代理关闭异常”,您怎么看?
这是我的流配置:
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.key.serde = ... Serdes$StringSerde
default.production.exception.handler = ... DefaultProductionExHandler
default.timestamp.extractor = ... FailOnInvalidTimestamp
default.value.serde = ... Serdes$StringSerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 8
partition.grouper = ... DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 1000
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-state
topology.optimization = all
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
【问题讨论】:
标签: apache-kafka apache-kafka-streams