提交和偏移量

本文介绍kafka消费者非常重要的内容-提交和偏移量,介绍前,我们先弄清楚两个基本概念和面临的问题:
一:基本概念

  1. 提交:把更新分区当前位置的操作叫做提交。
  2. 消费者偏移量:保存已经处理的每个分区的偏移量。

二:面临的问题

  1. 消息被重复处理:提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
    揭秘kafka消费者二(提交和偏移量)

  2. 消息丢失:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会丢失。
    揭秘kafka消费者二(提交和偏移量)
    所以,处理偏移量的方式对客户端会有很大的影响。KafkaConsumer API提供了很多种方式来提交偏移量。

自动提交

一:基本概念:

  1. 自动提交:消费者自动提交偏移量。

二:关键参数:

  1. enable.auto.commit:true:自动提交,false:不自动提交。
  2. auto.commit.interval.ms:提交时间间隔,默认5秒。

三:存在的问题:

  1. 重复处理消息。

提交当前偏移量

一:基本概念:

  1. 提交当前偏移量:提交由poll()方法返回的最新偏移量,提交成功马上返回,如果失败则抛出异常。

二:方法:

  1. commitSync():提交由poll()方法返回的最新偏移量。

三:存在的问题:

  1. 可能重复处理消息。
  2. 在broker对提交请求作出回应之前,应用程序会一直阻塞,这回限制应用程序吞吐量。

四:代码:

  1. 打印记录内容。
  2. 处理完当前批次的消息,再轮询更多消息前,调用commitSync()方法提交当前批次最新的偏移量。
  3. 只要没有发生不可恢复异常,commitSync()方法会一直尝试重试直至提交成功,如果提交失败,则记录错误日志。
    揭秘kafka消费者二(提交和偏移量)

异步提交

一:代码:

  1. 提交最后一个偏移量,然后继续做其他事情。不会进行重试,重试可能会覆盖最新的偏移量。

揭秘kafka消费者二(提交和偏移量)

  1. 通过回调函数记录提交错误日志或生成度量指标。
    揭秘kafka消费者二(提交和偏移量)

同步和异步组合提交

一:代码:

  1. 如果一切正常,我们使用commitAsync()方法提交,这样速度比较快,而且这次提交失败,下一次提交很可能会成功。
  2. 如果关闭消费者,使用commitSync()方法会一直重试,直到提交成功或发生无法恢复的错误。
    揭秘kafka消费者二(提交和偏移量)

提交特定偏移量

一:面临的问题:

  1. poll方法返回的数据量比较大,为了避免重复处理整批消息,想在批次处理中间发送偏移量。

二:解决方案:

  1. commitAsync()方法和commitSync()方法支持提交分区和特定偏移量map

三:代码:

  1. 记录偏移量的map。
  2. 业务处理。
  3. 读取记录之后,试用期望处理的一个消息的偏移量更新map里的偏移量。
  4. 每处理1000条记录就提交一次偏移量。
  5. 调用commitAsync()提交偏移量。
    揭秘kafka消费者二(提交和偏移量)
    以上就是我对kafka消费者提交偏移量的理解,哪里说的不对,还请赐教!

相关文章:

  • 2021-11-18
  • 2019-03-24
  • 2021-11-14
  • 2021-11-18
  • 2021-05-04
  • 2021-11-28
猜你喜欢
  • 2021-12-13
  • 2022-01-16
  • 2021-05-09
  • 2021-11-18
  • 2021-10-14
  • 2021-09-08
  • 2021-06-10
相关资源
相似解决方案