【问题标题】:How to properly configure Kafka Stream to ensure brokers resiliency?如何正确配置 Kafka Stream 以确保代理的弹性?
【发布时间】:2019-02-04 04:17:19
【问题描述】:

最近我试图确保我的 Kafka Streams(2.0.0 版本)应用程序在所有代理的未指定停机时间(停机时间更像几小时而不是几秒钟)。

它没有提供默认配置(retries on 0),因为在所有代理都被杀死后,我的所有流都将状态更改为ERROR 甚至DEAD 并停止工作:

INFO  o.a.k.s.p.internals.StreamThread - stream-thread [MyStream-0936f6a6-c9f4-4591-9b25-534abc65b8d1-StreamThread-24] State transition from RUNNING to PENDING_SHUTDOWN

然后:

INFO  o.a.k.s.p.internals.StreamThread - stream-thread [MyStream-0936f6a6-c9f4-4591-9b25-534abc65b8d1-StreamThread-24] State transition from PENDING_SHUTDOWN to DEAD

并且一些流很快就出现了错误:

stream-client [MyStream-88a8fe9a-d565-43e3-acb5-20cccc6b4a88] State transition from RUNNING to ERROR

因此,考虑到状态转换 (https://static.javadoc.io/org.apache.kafka/kafka-streams/2.0.0/org/apache/kafka/streams/KafkaStreams.State.html),除了重新启动应用程序之外,我无法以任何其他方式恢复。

我发现的是这个话题: Kafka Streams stops listening from topic and processing messages when broker goes down

它回答了我的问题,建议增加我的流retries 以及retry.backoff.ms 配置。这就是我所做的(将retries 增加到Int.MaxNumberretry.backoff.ms1000)并发现这种方法存在一些性能问题,并且我碰巧在日志中遇到了反复出现的错误:FETCH_SESSION_ID_NOT_FOUND 我确实找不到有关的信息。

除了增加重试次数之外,还有其他方法可以实现代理的弹性吗?我可以接受 Kafka Brokers 失败后丢失的一些消息,如果无法生成/使用消息,我真的不需要重试。我正在考虑在代理失败后手动重新启动流,但我不确定如何捕获“代理关闭异常”,您怎么看?

这是我的流配置:

buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.key.serde = ... Serdes$StringSerde
default.production.exception.handler = ... DefaultProductionExHandler
default.timestamp.extractor = ... FailOnInvalidTimestamp
default.value.serde = ... Serdes$StringSerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 8
partition.grouper = ... DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 1000
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-state
topology.optimization = all
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    增加重试次数是处理代理停机的正确方法。

    我不确定,为什么增加重试次数会对性能产生影响。你能详细说明一下吗?

    除此之外,没有什么比得上“中间商异常”了。您可以在KafkaStream 实例中注册未捕获的异常处理程序。它会通知您有关即将死去的线程,因此,您可以通过重新启动 KafkaStreams 客户端来对其做出反应。

    您也可以尝试更改production.exception.handler 并跳过一些生产错误。请注意,在这种情况下,您可能会丢失客户端中的所有缓冲消息。 (另请注意,您不能跳过处理程序的所有错误——一些错误被认为是致命的,KafkaStreams 将关闭,无论您的配置如何。)

    【讨论】:

    • 我在 Kafka "FETCH_SESSION_ID_NOT_FOUND" 中不断收到此错误,想了解其含义。
    • 这应该是一个可重试的消费者错误,因此应该只记录为警告。
    猜你喜欢
    • 2019-07-10
    • 2018-12-17
    • 1970-01-01
    • 1970-01-01
    • 2021-10-31
    • 1970-01-01
    • 2019-11-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多