【问题标题】:How to handle kafka producer and database save transaction?如何处理 kafka 生产者和数据库保存事务?
【发布时间】:2022-03-31 07:26:32
【问题描述】:

我需要将一个对象发送到一个kafka主题,然后如果发布到该主题成功,则只将其保存到postgres db,请问如何做到这一点?

【问题讨论】:

  • 你能说得更具体些吗?这个问题太宽泛,无法提供有意义的帮助。
  • 这是一个使用不同工具解决的问题...即你真的应该考虑使用 Kafka Connect JDBC sink 而不是自己写这个

标签: spring postgresql apache-kafka


【解决方案1】:

你可以让生产者同步。

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Transactional
public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg).get();
    saveToDb(msg);
}

或者你可以添加回调

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
    
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            saveToDb();
        }
        @Override
        public void onFailure(Throwable ex) {
            
        }
    });
}

【讨论】:

  • 如果saveToDb执行失败,该事件已经发送到kafka,所以它不会像执行的那样工作。
猜你喜欢
  • 2020-07-14
  • 1970-01-01
  • 2019-01-15
  • 2017-09-07
  • 2019-01-16
  • 2018-02-08
  • 1970-01-01
  • 1970-01-01
  • 2019-12-31
相关资源
最近更新 更多