【发布时间】:2017-12-24 23:42:39
【问题描述】:
新的 Kafka 版本 (0.11) 支持仅一次语义。
我有一个像这样在 java 中使用 kafka 事务代码的生产者设置。
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
我不太确定如何使用 sendOffsetsToTransaction 以及它的预期用例。 AFAIK,消费者组是消费者端的多线程读取功能。
javadoc 说
" 向消费者组协调器发送一个消耗的偏移量列表,并将这些偏移量标记为当前事务的一部分。这些偏移量只有在事务提交成功时才会被视为已消耗。需要时应使用此方法将消费和生产的消息一起批量处理,通常采用消费-转换-生产模式。”
produce 将如何维护消耗的偏移量列表?它有什么意义?
【问题讨论】:
标签: java multithreading apache-kafka kafka-producer-api