【问题标题】:Failed Kafka topic message listening repeatedly by listener in spring Kafka春季卡夫卡中的侦听器反复侦听失败的卡夫卡主题消息
【发布时间】:2019-11-06 16:15:27
【问题描述】:

我正在使用带有 spring boot 2.1.9 的 spring Kafka 和 confluentinc/cp-kafka:5.3.0(3 个 brokers clusterized) 作为 Broker。

有时在侦听器中,如果任何消息由于某些异常而失败,则消息会不断重复,侦听器将一次又一次地接收相同的消息并抛出异常。

而且由于这个问题,该主题的其他消息将不会被处理,新消息本身已暂停,侦听器不会收听

以下日志每次都会出现。 (同一条消息出现了近 30 次相同的错误)

我尝试重新启动 Kafka 和 spring boot 应用程序仍然是同样的问题。

日志:


2019-11-06 16:04:49.176  WARN [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@658b4494, txId=xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2]
2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to completeNormalFulfillmentItem 124

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.createAuditForCompleteFulfilmentFail(<generated>)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeDefaultFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1214)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1312)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.handleExistingTransaction(AbstractPlatformTransactionManager.java:430)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:354)
        at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:69)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:106)
        ... 67 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
        at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:499)
        at brave.kafka.clients.TracingProducer.beginTransaction(TracingProducer.java:50)
        at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:103)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:160)
        ... 71 common frames omitted

2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to complete fulfillment item (completeFulfillmentItem) :124

com.xxxxx.model.exception.ProcessException: failed to completeNormalFulfillmentItem 124
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1316)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at 

消费者配置

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:100}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:200000}")
    private Integer sessionTimoutMs;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);


    @Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }

    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);

        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        factory.setStatefulRetry(true);
        // NOTE: retryMaxAttempts should always +1 due to spring kafka bug
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic() + " value:"
                    + record.value());
            messageProducer.saveFailedMessage(record, exception);
        }, retryMaxAttempts + 1);

        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // Disable the Auto Commit if required for testing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

听众

@KafkaListener(id = TOPIC_FULFILLMENT_CREATE, topics = TOPIC_FULFILLMENT_CREATE)
    @Transactional(readOnly = false)
    public void processCreateRequest(@Payload String message) throws IOException {
        ComponentWorkflowModel componentWorkflowModel = JsonUtil.toObject(message, ComponentWorkflowModel.class);
        componentWorkflowStarter.processCreateRequest(componentWorkflowModel);
    }
  1. 有没有什么办法可以停止收听错误信息?我用了 seekErrorHandler 有时候只能用,是不是配置有问题?

  2. 卡夫卡用spring Kafka有问题吗?

  3. 如何解决此问题?

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka


    【解决方案1】:

    看起来配置不正确。

    org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
            at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
            at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
            at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
            at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
            at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    ...
    Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
            at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)
    

    当一个已经存在的事务时,您正尝试开始一个新事务。

    您已将链式事务管理器注入到侦听器容器中,因此已经存在事务。

    看起来您有一个@Transactional 注释,它也引用了同一个事务管理器,可能带有传播REQUIRES_NEW。除非您有不同的 transactional.id,否则您不能将其与 Kafka 一起使用 - 但这可能不是您想要的。

    【讨论】:

    • 感谢您的回复。因此,根据您的建议,如果我同时使用 Transactional 和 kafkaListener 注释,它的问题对吗?
    • 我的容器或消费者配置也有什么问题,因为即使消息失败,它也应该在某些尝试后立即停止,它会继续出现,并且因为其他一些消息也相同没听过的话题?
    • 否;如果失败,事务将回滚,DefaultAfterRollbackProcessor 重新查找未处理的记录并重播。对于旧版本的框架,它将永远重试;从 spring-kafka 2.2 (boot 2.1) 开始;在可配置的(默认 10 次)交付尝试次数后,它将放弃。
    • 我使用的是 spring kafka 2.2.9 版本,我将最大重试次数设置为 3。但它也一次又一次地重播。从过去 6 小时开始,它发生了近 100 次,我怀疑我在侦听器 /Recieve 配置中遗漏了什么或配置错误?
    • 您也不应该在事务中使用SeekToCurrentErrorHandler,因为它将在事务中运行。您需要改用AfterRollbackProcessorDefaultAfterRollbackProcessor 执行与 STCEH 相同的功能,但它在事务回滚后运行。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-18
    • 2021-11-30
    • 2018-01-15
    • 2018-03-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多