提交和偏移量
本文介绍kafka消费者非常重要的内容-提交和偏移量,介绍前,我们先弄清楚两个基本概念和面临的问题:
一:基本概念
- 提交:把更新分区当前位置的操作叫做提交。
- 消费者偏移量:保存已经处理的每个分区的偏移量。
二:面临的问题
-
消息被重复处理:提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
-
消息丢失:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会丢失。
所以,处理偏移量的方式对客户端会有很大的影响。KafkaConsumer API提供了很多种方式来提交偏移量。
自动提交
一:基本概念:
- 自动提交:消费者自动提交偏移量。
二:关键参数:
- enable.auto.commit:true:自动提交,false:不自动提交。
- auto.commit.interval.ms:提交时间间隔,默认5秒。
三:存在的问题:
- 重复处理消息。
提交当前偏移量
一:基本概念:
- 提交当前偏移量:提交由poll()方法返回的最新偏移量,提交成功马上返回,如果失败则抛出异常。
二:方法:
- commitSync():提交由poll()方法返回的最新偏移量。
三:存在的问题:
- 可能重复处理消息。
- 在broker对提交请求作出回应之前,应用程序会一直阻塞,这回限制应用程序吞吐量。
四:代码:
- 打印记录内容。
- 处理完当前批次的消息,再轮询更多消息前,调用commitSync()方法提交当前批次最新的偏移量。
- 只要没有发生不可恢复异常,commitSync()方法会一直尝试重试直至提交成功,如果提交失败,则记录错误日志。
异步提交
一:代码:
- 提交最后一个偏移量,然后继续做其他事情。不会进行重试,重试可能会覆盖最新的偏移量。
- 通过回调函数记录提交错误日志或生成度量指标。
同步和异步组合提交
一:代码:
- 如果一切正常,我们使用commitAsync()方法提交,这样速度比较快,而且这次提交失败,下一次提交很可能会成功。
- 如果关闭消费者,使用commitSync()方法会一直重试,直到提交成功或发生无法恢复的错误。
提交特定偏移量
一:面临的问题:
- poll方法返回的数据量比较大,为了避免重复处理整批消息,想在批次处理中间发送偏移量。
二:解决方案:
- commitAsync()方法和commitSync()方法支持提交分区和特定偏移量map
三:代码:
- 记录偏移量的map。
- 业务处理。
- 读取记录之后,试用期望处理的一个消息的偏移量更新map里的偏移量。
- 每处理1000条记录就提交一次偏移量。
- 调用commitAsync()提交偏移量。
以上就是我对kafka消费者提交偏移量的理解,哪里说的不对,还请赐教!