【问题标题】:How to wrap a @KafkaListener which is handled by seekToCurrentErrorHandler with a latch for testing如何包装由 seekToCurrentErrorHandler 处理的@KafkaListener 并带有用于测试的闩锁
【发布时间】:2020-11-12 23:49:21
【问题描述】:

我有一个 kafka 监听器,它抛出一个异常并转到 seektocurrenterrorhandler 进行处理。我正在尝试包装侦听器以添加闩锁并对其进行断言以进行测试。但它似乎不起作用。有人可以建议吗。

听众:

@KafkaListener(id="testRetry",topics = "sr1", groupId = "retry-grp", containerFactory = "kafkaRetryListenerContainerFactory")
    public void listen1(ConsumerRecord<String, Anky> record,
            @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery, Acknowledgment ack) throws UserException {
            throw new UserException(code.getDbError());

    }

配置:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory(SeekToCurrentErrorHandler seekToCurrentErrorHandler) {

        ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        // to get the no retry count from the header
        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }


    @Bean
    public SeekToCurrentErrorHandler seekToCurrentErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            //do nothing
        });
        errorHandler.setCommitRecovered(true);
        errorHandler.setBackOffFunction((record, exception) -> {
            return new FixedBackOff(0L,5L);

        });
        return errorHandler;
    }

测试:

@Test
public void testRetry() throws Exception,UserException {
    Anky msg = new Anky();
    this.template.send("sr1", "1234", msg);
    CountDownLatch latch = new CountDownLatch(1);
            ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry
            .getListenerContainer("testRetry");
    container.stop();
    @SuppressWarnings("unchecked")
    AcknowledgingConsumerAwareMessageListener<String, Anky> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Anky>) container
            .getContainerProperties().getMessageListener();

    container.getContainerProperties()
            .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Anky() {

                @Override
                public void onMessage(ConsumerRecord<String, Anky> data, Acknowledgment acknowledgment,
                        Consumer<?, ?> consumer) {
                    messageListener.onMessage(data, acknowledgment, consumer);
                    latch.countDown();
                }

            });
    container.start();
    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    
}

【问题讨论】:

  • 如果您从侦听器抛出异常,闩锁如何在那里工作? countDown() 只是无法访问...

标签: java spring-boot apache-kafka spring-kafka spring-kafka-test


【解决方案1】:

添加一个try...finally 块。

try {
    messageListener.onMessage(data, acknowledgment, consumer);
}
finally {
    latch.countDown();
}

【讨论】:

  • 我还在对注入到侦听器的服务方法进行验证,以确保重试尝试 5 次,如果我最后执行了 latch.countdown(),那么它只重试一次而不是 5 次...
  • 这没有意义 - 您是否将 CountDownLatch 增加到 5?
  • 嗨,加里,我看到有时测试不会调用消费者并且随机失败,闩锁在 jenkins 构建上返回 false .. 为什么会发生这种情况?你能建议...
  • 这可能是一种竞态条件,很可能是由于未将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为earliest 造成的。它默认为latest,这意味着如果在模板发送后启动,消费者将看不到记录。
  • 我检查了,消费者配置设置为最早...但有时文本仍然失败...有什么办法可以避免这种不一致吗?
猜你喜欢
  • 2021-12-13
  • 1970-01-01
  • 2020-06-06
  • 1970-01-01
  • 1970-01-01
  • 2020-06-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多