【问题标题】:Post to topic and Manual commit in spring kafka在spring kafka中发布到主题和手动提交
【发布时间】:2020-09-08 06:16:20
【问题描述】:

我们正在处理 Spring-kafka 消费者提交主题然后应该手动提交偏移量的要求。

我们正在考虑的一种情况是,当消费者在向主题提交消息到手动提交偏移之间失败时会发生什么情况。在这种情况下,应用程序将重新处理消息并再次提交一次,从而导致主题中出现重复消息。

有什么方法可以让这两个活动都成为 TransactionManager 的一部分,从而使其全部成功/失败?

配置文件

@Bean
public ProducerFactory producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
    defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
    //defaultKafkaProducerFactory.transactionCapable();
    return defaultKafkaProducerFactory;
    //return new DefaultKafkaProducerFactory<>(config);
}


@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}


@Bean
public KafkaTransactionManager<String, User> transactionManager() {
    KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
    transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
    transactionManager.setNestedTransactionAllowed(true);
    return transactionManager;
}


/**
 * New configuration for the consumerFactory added
 *
 * @return
 */
@Bean
public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}


@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setTransactionManager(transactionManager());
    factory.setRetryTemplate(kafkaRetry());
    factory.setStatefulRetry(true);
    factory.setErrorHandler(getErrorHandler());
    factory.setRecoveryCallback(retryContext -> {
        //implement the logic to decide the action after all retries are over.
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        System.out.println("Recovery is called for message  " + consumerRecord.value());
        return Optional.empty();
    });

    return factory;
}



public RetryTemplate kafkaRetry() {
    RetryTemplate retryTemplate = new RetryTemplate();

    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(60 * 1000);
    backOffPolicy.setMultiplier(3);
    backOffPolicy.setMaxInterval(4 * 60 * 1000);       // original 25 * 60 * 1000
    retryTemplate.setBackOffPolicy(backOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(4);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}


public SeekToCurrentErrorHandler getErrorHandler() {
    SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {

        @Override
        public void handle(Exception thrownException,
                           List<ConsumerRecord<?, ?>> records,
                           Consumer<?, ?> consumer,
                           MessageListenerContainer container) {
            //super.handle(thrownException, records, consumer, container);
            if (!records.isEmpty()) {
                ConsumerRecord<?, ?> record = records.get(0);
                String topic = record.topic();
                long offset = record.offset();
                int partition = record.partition();

                if (thrownException instanceof DeserializationException) {
                    System.out.println("------1111------deserialization exception ");
                } else {
                    System.out.println("------xxxx------Record is empty ");
                    consumer.seek(new TopicPartition(topic, partition), offset);
                }
            } else {
                System.out.println("------4444------Record is empty ");
            }

        }
    };

    return errorHandler;
}

卡夫卡监听器

   @Autowired
KafkaTemplate<String, User> kafkaTemplate;


@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {

    /*System.out.println("get the message " +user.getFirstName());
    if (user.getFirstName().equalsIgnoreCase("Test")) {
        throw new RuntimeException("Incompatible message " + user.getFirstName());
    }
    */

    //postToSecondTopic(acknowledgment, user);

    System.out.println("NOT In transaction");
    kafkaTemplate.executeInTransaction(t -> {
        System.out.println("---------------------->");
        int number = (int) (Math.random() * 10);
        t.send("secondtopic", user);
        if (number % 5 == 0)
            throw new RuntimeException("fail");
        acknowledgment.acknowledge();
        return true;
    });


    System.out.println("*** exit ***");
}

日志中的错误

2020-05-28 15:52:53.597 错误 112469 --- [nio-8080-exec-1] oaccC[.[.[/].[dispatcherServlet]:Servlet.service() 用于 servlet [dispatcherServlet]在路径 [] 的上下文中抛出异常 [请求处理失败;嵌套异常是 java.lang.IllegalStateException: No transaction is in process;可能的解决方案:在 template.executeInTransaction() 操作的范围内运行模板操作,在调用模板方法之前使用 @Transactional 启动事务,在使用记录时在侦听器容器启动的事务中运行] 根本原因

java.lang.IllegalStateException:没有事务正在进行;可能的解决方案:在 template.executeInTransaction() 操作范围内运行模板操作,在调用模板方法之前使用 @Transactional 启动事务,在消费记录时在侦听器容器启动的事务中运行 在 org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:394) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE] 在 org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:216) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE] 在 com.barade.sandesh.springKafka.UserResource.postComments(UserResource.java:26) ~[classes/:na] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_252] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_252] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_252] 在 java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_252] 在 org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]

用户资源

@RestController
@RequestMapping("accounts")
public class UserResource {

    @Autowired
    KafkaTemplate <String, User> kafkaTemplate;


    @PostMapping("/users")
    public String postComments(@RequestParam ("firstName") final String firstName,
                                    @RequestParam ("lastName") final String lastName,
                                    @RequestParam ("userName") final String userName )  {

        List<String> accountTypes = new ArrayList<String>();
        kafkaTemplate.send("firstTopic", new User(firstName,lastName,userName));

        return "Message sent to the Error queue";
    }



}

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    是的;见Transactions

    容器启动事务,监听器中的任何KafkaTemplate发送操作都参与事务;容器将偏移量发送到事务并提交(如果侦听器正常退出)。

    无需“手动”提交偏移量。

    【讨论】:

    • 感谢 Gary 的快速回复。在我们的例子中,监听器从一个主题读取,然后写入另一个主题。如果在将 msg 推送到第二个主题后不是手动提交和侦听器错误,偏移量将如何提交?它不会重新启动并处理相同的消息并尝试再次发布吗?
    • 我正在查看您提供的链接,想知道是否可以为此利用 KafkaTransactionManager
    • 是的;使用KafkaTransactionManager 时,如果侦听器抛出异常,事务将回滚,DefaultAfterRollbackProcessor 重新搜索记录,以便重播相同的记录。您可以配置回滚后处理器以将一直失败的记录发送到死信主题。见docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/…docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/…
    • 从交易链接中,我尝试使用 Kafkatemplate 来运行我的交易。我添加了 defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");我的代码是 [link]github.com/sandeshbarade/springKafka 我收到此错误 2020-05-27 19:31 错误 [dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] 在路径 [] 的上下文中抛出异常 [请求处理失败;嵌套异常是 java.lang.IllegalStateException: No transaction is in process;可能的解决方案:在 template.executeInTransaction() 操作的范围内运行模板操作
    • 对于生产者发起的事务(例如来自网络控制器),您必须使用 @Transactional 注释方法(使用 KafkaTransactionManager 或使用 template.executeInTransaction()。要使用 @Transactional 你在 @Configuration 类上需要 @EnableTransactionManagement
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-20
    • 2018-05-21
    • 2019-07-01
    • 2018-11-23
    • 1970-01-01
    • 2022-01-02
    相关资源
    最近更新 更多