【问题标题】:Kafka Stream and Consumer Group Weird BehaviorKafka 流和消费者组的奇怪行为
【发布时间】:2018-03-13 22:35:20
【问题描述】:

我有两个高级问题被分解成更多单独的问题,这两个高级问题都涉及 Apache Kafka Streams API 正在创建和使用的消费者组。

首先是kafka-consumer-group.sh 脚本的输出。我得到了奇怪的输出,虽然他们似乎连接到特定的组/主题/分区,但并没有真正告诉我特定的消费者在哪里:

TOPIC    PARTITION    CURRENT-OFFSET    LOG-END-OFFSET    LAG
STANDARD_DATA                  9          11              11              0          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer-4fd9dc15-d8a7-4598-85a9-3761ae6a747b/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer
STANDARD_DATA                  0          4               11              7          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer-28e1c7bf-860d-44d6-bf58-5e0ff875587c/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer
STANDARD_DATA                  4          -               10              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer
STANDARD_DATA                  5          -               10              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer-a81f1399-1fc4-4579-b24f-fa8fee01fabf/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer
STANDARD_DATA                  3          -               12              -          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer-6a83bfcc-2c6e-4e9d-a819-029ac8c6ae17/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer
STANDARD_DATA                  8          12              12              0          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer-6d46bed3-70c4-4c7f-8e53-f9591192bc3f/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer
STANDARD_DATA                  7          -               11              -          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer-5313315b-ded9-4fe7-ac9d-d8d5b20dd5b9/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer
STANDARD_DATA                  2          10              10              0          myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer-c08a648f-548e-47a8-8bc5-7b6fa3bc1fb5/1.1.1.1                  myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer
STANDARD_DATA                  1          2               10              8          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer-08d99679-d430-4e9f-a3b9-11e558ca34a4/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer
STANDARD_DATA                  6          -               12              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer-666040f8-d4d0-49e9-9db6-c6efee49ebe1/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer
  1. 当我可以直接查询 Kafka 的 API 以区分它们实际上已被捕获时,为什么某些 CURRENT-OFFSETS(第 3 列)和 LAG(第 4 列)显示为“-”?

(通过golang API查询)

4                      myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3    OFFSET: 10        LOG-END: 10                LAG: 0
  1. 另外,为什么该偏移量一般不会像日志中所表示的那样显示(又名,它应该被赶上)?

我的第二个高级问题是关于流的问题。我们有一个流处理工作,即在随机时间(主要是在重新启动期间)重置为特定主题中可用的最早偏移量。在整个代码中没有“重置”,也没有触及 OFFSET_RESET。我还可以确认我们没有使用“exactly-once”,所以我不确定这些偏移重置究竟在哪里发挥作用。

再一次,它基本上是:

流处理正在处理数据,有些事情~发生~然后我们的偏移量回到地面 0,再次处理。在决定重置之前,这可能会持续数天到数周,因此正在提交偏移量。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    关于kafka-consumer-groups.sh 的输出: CURRENT-OFFSET 中的- 表示该分区没有提交的偏移量。这意味着,也无法计算滞后时间(因此,您也会在此处获得 -)。

    如果我正确阅读了您的语句,如果您使用 golang 查询偏移量,则显示分区 4 位于偏移量 10,与 kafka-consumer-groups.sh 显示的相反——不知道为什么会这样......

    关于重置的偏移量:您可能需要增加代理配置offsets.retention.minutes——默认为 24​​ 小时(参见https://docs.confluent.io/current/streams/faq.html#why-is-my-application-re-processing-data-from-the-beginning)。

    另请注意,Streams API 使用“最早”的默认重置策略(与默认使用“最新”的消费者 API 形成对比)。您可以通过 StreamsConfig: https://docs.confluent.io/current/streams/developer-guide.html#non-streams-configuration-parameters 更改 Streams API 中的重置策略

    【讨论】:

    • 这绝对是完美的。我将调整这些设置,看看它是否能解决问题。关于 offsets.retention.minutes 通常有最佳实践吗?我们的应用程序有一段时间的空闲时间,因此通过修复重置策略我不确定我们是否真的需要提高它。想法?
    • sorry - 问题:如果消费者坐在那里听消费者组主题并且在 offsets.retention.minutes 时间段之后没有消息进入,那么一旦有新消息到来,消费者是否仍会重置在?例如,如果偏移量在 55 处停留了 30 小时,然后数字 56 进入......如果消费者是消费者组的一部分,它是否会重新连接并在偏移量 57 处自动消费(因为它重置为最新的偏移量?)
    • 如果您使用 auto.offset.reset=latest 失去偏移量,理论上可能会发生,您会跳过一些记录并且不处理它们。因此,即使您使用“最新”,也建议增加偏移保留时间。
    • 这就是我的想法 Matthias J. Sax。只是想澄清一下,以确保我不会试图破解我的方式=)。
    猜你喜欢
    • 1970-01-01
    • 2020-04-19
    • 2020-09-19
    • 2017-01-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多