【问题标题】:Not able to poll / fetch all records from kafka topic无法从 kafka 主题轮询/获取所有记录
【发布时间】:2020-03-13 07:22:41
【问题描述】:
我正在尝试从特定主题轮询数据,例如 kafka 正在接收 100 条记录/秒
但大多数时候它不会获取所有记录。
我使用超时时间为 5000 毫秒,并且每隔 100ms 调用此方法
注意:我也在订阅特定主题
@Scheduled(fixedDelayString = "100")
public void pollRecords() {
ConsumerRecords<String, String> records =
leadConsumer.poll("5000");
如何从 kafka 获取所有数据?
【问题讨论】:
标签:
java
apache-kafka
kafka-consumer-api
spring-kafka
【解决方案1】:
从 poll() 返回的最大记录数由 max.poll.records 消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制了从服务器返回的最大数据量。 (fetch.max.bytes 和max.partition.fetch.bytes)
另一方面,在代理方面还有另一个大小限制,称为message.max.bytes。
所以你应该正确设置这些参数以获得更多的消息。
来自 Kafka 文档 (link):
max.poll.records:单次返回的最大记录数
调用 poll()。 (默认:500)
fetch.max.bytes:服务器应为获取请求返回的最大数据量。记录由分批获取
消费者,并且如果第一个非空的第一个记录批次
fetch的partition大于这个值,记录batch
仍然会被退回,以确保消费者可以取得进展。
因此,这不是绝对最大值。最大记录批量大小
代理接受的定义是通过 message.max.bytes (代理
config)或 max.message.bytes(主题配置)。请注意,消费者
并行执行多个提取。 (默认:52428800)
message.max.bytes: Kafka 允许的最大记录批量大小。如果增加并且有 0.10.2 岁以上的消费者,则
消费者的获取大小也必须增加,以便他们可以
获取这么大的记录批次。在最新的消息格式版本中,
为了提高效率,记录总是分组为批次。在以前
消息格式版本,未压缩的记录不分组
批次,此限制仅适用于其中的单个记录
case.This 可以使用主题级别 max.message.bytes 为每个主题设置
配置。 (默认:1000012)
max.partition.fetch.bytes:服务器将返回的每个分区的最大数据量。分批获取记录
由消费者。如果第一个记录批次中的第一个非空
fetch的partition大于这个限制,batch仍然会
被退回以确保消费者可以取得进展。最大值
代理接受的记录批量大小通过以下方式定义
message.max.bytes(代理配置)或 max.message.bytes(主题配置)。
有关限制消费者请求大小的信息,请参阅 fetch.max.bytes。 (默认:1048576)