【发布时间】:2020-01-22 11:28:21
【问题描述】:
您好,我正在使用 Kafka MapRStream 接收来自 Mapr Streams 主题的事件。
我正在尝试增加我的消费者的批量大小,但我在一批中收到的消息不超过30条!! p>
单个事件的大小约为 5000 字节。如果事件较小,我会在一批中获得更多。
这是我对 Consumer 的配置:
public static void main( String[] args ) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchSize");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 26214400);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 100 * 1024 * 1024);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
long totalCount = 0;
long start = System.currentTimeMillis();
long countTimesNoMessages = 0;
while (countTimesNoMessages < 10) {
ConsumerRecords<String, String> records = consumer.poll(1000);
totalCount += records.count();
System.out.println(records.count());
if (records.count() == 0) {
countTimesNoMessages++;
}
}
long end = System.currentTimeMillis();
System.out.println((end - start) + " for " + totalCount + " messages");
}
【问题讨论】:
-
每秒有多少条消息进入主题?
-
主题满是消息,大约每秒500条,已经包含100万条
标签: java kafka-consumer-api mapr batchsize