【问题标题】:Reactive Kafka : Exactly Once Processing with Transaction反应式 Kafka:使用事务处理一次
【发布时间】:2019-08-06 01:55:29
【问题描述】:

最初通过api调用触发

1. Service A produces m1 to topic1 (non transactional send)

2. Service B consumes topic1 and does some processing
(begin tx)

3. Service B produces m2 to topic2
(commit tx)

4. Service A consumes topic2
(begin tx)

这是我的生产者配置:

final Map<String, Object> props = Maps.newConcurrentMap();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

这是我的消费者配置:

final Map<String, Object> props = Maps.newHashMap();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

我读到了这个sample scenarios

我尝试关注,但遇到了一些问题:

这是我的生产者代码:

    @Override
public Mono<SenderResult<Void>> buy(Message msg) {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    return kafkaProducerTemplate.send(mytopic, msg);

}

我的消费者代码:

@Override
public void run(ApplicationArguments arg0) throws Exception {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);

    final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
            .concatMap(receiverRecordFlux -> receiverRecordFlux );

    flux.subscribe(record -> {
        final Message message = record.value();

    System.out.printf("received message: timestamp=%s key=%d value=%s\n",
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                message);

     transactionService.processAndSendToNextTopic(message)
                .doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
                .subscribe();

    });
}

在快乐的情况下测试生成和消费消息到单个分区主题时,我总是遇到以下错误。

Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

谁能告诉我如何正确开始交易,以及何时提交和关闭交易?

请注意,如果我不使用事务性 - receive() 而不是 receiveExactlyOnce,这一切都可以正常工作

【问题讨论】:

  • 这很像您的事务尚未提交并且您正在开始一个新的,您能否分享有关您的代码的更多详细信息?特别是关于 transactionService.processAndSendToNextTopic(message) ?顺便说一句,您显示的配置是否适用于服务 A 和服务 B ?因为服务 B 消费者不需要读取已提交的隔离级别(因为它确实读取了非 trx 消息)
  • 是的,所以在 serviceB 从 topic1 消费之后,它会执行 processAndSend,它基本上会执行一些数据库操作,然后执行 service B 的生产者将 msg 发送到 topic2
  • Yannick 你有一个使用事务的反应式 kafka 模板的简单示例吗?

标签: apache-kafka spring-kafka project-reactor reactive-kafka


【解决方案1】:

您可能需要将 KafkaTransactionManager 同步配置为SYNCHRONIZATION_ALWAYS

请检查。

另见

Transaction Synchronization in Spring Kafka

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/transaction/KafkaTransactionManager.html

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-03-10
    • 2019-01-16
    • 2015-07-22
    • 1970-01-01
    • 2021-09-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多