正如 API 文档中所说:
这是一个同步提交,将阻塞直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)。
这意味着,commitSync 是一种阻塞方法。调用它会阻塞你的线程,直到它成功或失败。
例如,
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
对于for循环中的每次迭代,只有在consumer.commitSync()成功返回或中断并抛出异常后,您的代码才会移动到下一次迭代。
这是一个异步调用,不会阻塞。遇到的任何错误都将传递给回调(如果提供)或丢弃。
这意味着,commitAsync 是一种非阻塞方法。调用它不会阻塞你的线程。相反,它将继续处理以下指令,无论最终成功还是失败。
例如,类似于前面的例子,但这里我们使用commitAsync:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
对于 for 循环中的每次迭代,无论 consumer.commitAsync() 最终会发生什么,您的代码都将移至下一次迭代。而且,提交的结果将由您定义的回调函数处理。
权衡:延迟与数据一致性
- 如果你必须保证数据的一致性,选择
commitSync(),因为它会确保在做任何进一步的操作之前,你会知道偏移提交是成功还是失败。但由于它是同步和阻塞的,您将花费更多时间等待提交完成,从而导致高延迟。
- 如果您对某些数据不一致并希望有低延迟,请选择
commitAsync(),因为它不会等待完成。相反,它只会发送提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时,您的代码将继续执行。
这都是一般来说,实际行为将取决于您的实际代码以及您调用该方法的位置。