【问题标题】:How to check if Kafka instance is up如何检查 Kafka 实例是否已启动
【发布时间】:2022-02-12 22:58:16
【问题描述】:

我正在开发一个 Spring Boot 项目,我使用 spring-kafka 依赖项使用来自 Kafka 主题的消息。

这是我的听众,它工作正常:

@KafkaListener(topics = "${mytopic.name}",  containerFactory = "kafkaEventListenerObjectContainerFactory")
public void listenTopic(ConsumerRecord<String, KafkaEvent> cr,
                       @Payload KafkaEvent kafkaEvent) throws JsonProcessingException {
                           
// my code here...                             
    
}

我的问题是我想检查 Kafka 实例是否已启动。我想开发一个连接到我的 Kafka 实例的 REST API,如果连接不正常,我想通过电子邮件发送警报。

我的想法是对的,还是有另一种干净的方法来检查 Kafka 是否已启动?

您有什么好主意来实现这一目标吗?我计划每 15 分钟执行一次检查以获取状态。

【问题讨论】:

  • 这不应该是您的应用程序的责任。您应该使用适当的监控系统,例如 ichinga 或类似系统。
  • @FedericoklezCulloca 感谢您的回复。是的,我知道,但我想知道在技术上我能做到这一点。因为我的应用需要知道在 kafka 端解决问题时是否没有消息到达。

标签: java spring spring-boot apache-kafka


【解决方案1】:

从命令行这样

第 3 步:创建主题 让我们创建一个名为“test”的主题,它只有一个分区和一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_name
We can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
topic_name

或者,除了手动创建主题,您还可以 将您的代理配置为在不存在主题时自动创建主题 发布到。

这个参考检查它:

https://kafka.apache.org/081/documentation.html

或者通过这种方式从应用程序运行您的应用程序时您可以看到调试跟踪 如果消费者重新加入并且消费者配置与生产者配置正确

这样:

生产者配置:

2021-07-26 15:09:46.124  INFO 14802 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
acks = 1
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = 
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT

还有这个消费者配置:

 2021-07-26 15:09:48.140  INFO 14802 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-1, groupId=store-services] Subscribed to topic(s): updateRate //here topic name 
2021-07-26 15:09:48.146  INFO 14802 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [kafka:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = groupId
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8

并且您必须在成功加入时显示:

    2021-07-26 15:15:53.292  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Revoking previously assigned partitions []
2021-07-26 15:15:53.292  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:53.294  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2021-07-26 15:15:53.295  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Revoking previously assigned partitions []
2021-07-26 15:15:53.295  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.407  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:53.408  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.412  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.412  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:56.462  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Successfully joined group with generation 11
2021-07-26 15:15:56.462  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Successfully joined group with generation 11
2021-07-26 15:15:56.464  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Setting newly assigned partitions: addingOrderForBranch-0
2021-07-26 15:15:56.464  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Setting newly assigned partitions: updateRate-0
2021-07-26 15:15:56.474  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Setting offset for partition addingOrderForBranch-0 to the committed offset FetchPosition{offset=328, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}
2021-07-26 15:15:56.474  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Setting offset for partition updateRate-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}

【讨论】:

  • 感谢您的回复。检查日志仅在应用程序启动时有效。但我想随时进行此检查。例如,每 15 分钟调用一次我的 rest API 以向我提供与 kafka 的连接状态
【解决方案2】:

Spring 提供了开箱即用的解决方案。

添加 spring-boot-starter-actuator 的依赖项,将一些额外的配置添加到您的 application.properties 或 yaml 文件中,例如management.server.port=8443 或者您可以省略它,它使用您的 Web 服务器端口。此外,您可能还想添加额外的 management.endpoint.xxx 配置 (Enabling Production-ready Features)。 将 StreamsHealthIndicator 类添加到您的项目中,然后您可以从外部调用执行器,例如http://localhost:8443/actuator/health 并将包含来自您的 StreamsHealthIndicator 的结果。根据 management.endpoint.health.show-details=ALWAYS 的设置,它还将包括来自 HealthIndicator 的所有详细信息,如线程、任务等。

StreamsHealthIndicator 的示例可以在这里找到example

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-03-30
    • 2012-02-17
    • 2020-08-04
    • 2016-03-13
    • 2016-08-04
    • 2015-04-24
    • 1970-01-01
    相关资源
    最近更新 更多