【问题标题】:What is the best approach to write to two kafka topics in a single transaction in Spring batch job在 Spring 批处理作业中的单个事务中写入两个 kafka 主题的最佳方法是什么
【发布时间】:2021-04-13 04:36:21
【问题描述】:

当前实施

我有一个写入 kafka 主题的春季批处理作业。 我从数据库中读取记录,对其进行转换并写入 kafka 主题。

现有工作的新变化

我想连同主要主题一起写一个审计主题。

对于从数据库中读取的每条记录,我正在向主要主题写入一条说 Class Abc 类型的消息,对于同一条记录,我想将另一个实体类类型的消息写入审计主题。

问题陈述

目前,我正在使用不同的 KakfaTemplate 来写入两个主题,但问题是如何处理如果在写入主要主题后作业失败并且它从不写入审计主题。如何回滚事务(我目前的实现中没有实现事务)。

我需要更改我的应用程序的整个实现吗?我应该在一个事务中同时写这两个主题,还是有针对我当前实现的任何解决方案?

事务管理器

@Override
protected JobRepository createJobRepo(){
JobRepositoryFactoryBean fac = new JobRepositoryFactoryBean;
fac.setDataSource(ds);
fac.setTransactionManger(transactionManger);
fac.set();
return fac.getObject();

【问题讨论】:

    标签: java apache-kafka spring-batch


    【解决方案1】:

    从长远来看,更改实施将使您的生活更轻松。您所描述的问题被称为事务发件箱模式,并且有许多被广泛接受的实现。

    批处理作业适合 Kafka 连接器(Debezium 是一种更复杂、更灵活的解决方案)。连接器以原子方式处理缩放、协调、偏移处理和并发,否则您必须使用 select for update 等方式自行实现。

    我首选的解决方案是简化问题。并将其分为两部分。

    使用连接器将记录写入主题。 使用具有精确一次语义的 SMT(无状态单消息转换)的 kafka 流应用程序将转换后的消息生成到审计日志。 这样,只有在生成原始主题中的消息时,才会在 adit 日志中显示消息。事务的复杂性已经解决了。

    kafka 连接器 (Debezium) 将处理重试、故障转移、偏移等。

    另一种较旧的方法是事务发件箱,可以使用Debezium TX-Outbox

    【讨论】:

      【解决方案2】:

      要正确实现这一点,您需要使用 JTA 事务管理器配置 Spring Batch,该事务管理器协调 DatasourceTransactionManager(用于 Spring Batch 的技术元数据)和 KafkaTransactionManager(用于您的业务数据)。

      在 Spring 批处理作业的单个事务中写入两个 kafka 主题的最佳方法是什么

      如果您在此处使用上一个问题所建议的内容:https://stackoverflow.com/a/65287130/5019386,则两个编写器将在 Spring Batch 驱动的同一事务中执行。

      【讨论】:

      • 我能够实现参考链接中的代码,并且我能够编写这两个主题。顺便提一下,我对这两个主题都使用了不同的 KafkaTemplate,因为 SSL 部分的 producerfactory 属性值不同,希望这不是问题。所以回到你的建议,用我当前的代码,我只需要配置 DatasourceTransactionmanger 和 KafkaTransactionManger 来解决我的问题?
      • 我已经在我的类的 JobRepositoryFactoryBean 中设置了 platformTransactionManger,并用 @Configuration 注释并扩展了 DefaultBatchConfigurer。元数据已经存储在 spring 批处理表中。我不知道它是如何在单个事务中管理 kafka 编写器的,并且在第二个编写器失败的情况下会回滚。现在没有测试这个测试用例。请看我用代码段编辑的帖子。
      • 我只尝试了我已经拥有的 transactionManager,我看到了一个工作正在写入主要主题但未能写入审计主题的情况。我仍然不知道如何回滚事务。
      • I tried with just the transactionManager I already have:您没有指定您已经拥有哪个事务管理器,但是正如我在答案中提到的,您需要使用一个JTA事务管理器来协调kafka和数据库之间的分布式事务。
      • 我的工作中有来自自动配置的 DataSourceTransactionManager。我正在尝试为两个主题中的每个主题设置两个 kafkaTransactionManger,然后将它们链接到 ChainedKafkaTransactionManager 中。可以将其传递给 Writer 类中的 @Transactional,我们正在向两个作者写入,但我收到错误,因为只需要 1 个事务管理器。
      猜你喜欢
      • 2019-07-18
      • 2018-05-27
      • 2018-07-15
      • 2016-10-27
      • 2020-12-16
      • 2017-06-22
      • 2011-02-26
      • 2016-12-12
      • 1970-01-01
      相关资源
      最近更新 更多