【发布时间】:2019-09-16 00:31:41
【问题描述】:
Spring Kafka,因此Spring Cloud Stream,允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能的实际应用:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:
@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
logger.info("Received event={}", data);
Person person = new Person();
person.setName(data.getName());
if(shouldFail.get()) {
shouldFail.set(false);
throw new RuntimeException("Simulated network error");
} else {
//We fail every other request as a test
shouldFail.set(true);
}
logger.info("Saving person={}", person);
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
在这段摘录中,有一个从 Kafka 主题读取,在数据库中写入和另一个对另一个 Kafka 主题的写入,所有这些都是事务性的。
我想知道并想回答的是,这在技术上是如何实现和实施的。
由于数据源和 Kafka 不参与 XA 事务(2 阶段提交),那么实现如何保证本地事务可以从 Kafka 读取、提交到数据库并以事务方式将所有这些写入 Kafka?
【问题讨论】:
标签: apache-kafka spring-cloud spring-cloud-stream spring-kafka