【问题标题】:How can i unit test a Kstream with an Kafka Binder?如何使用 Kafka Binder 对 Kstream 进行单元测试?
【发布时间】:2019-04-15 22:27:50
【问题描述】:

我想对 Kafka Stream Aggregate 进行单元测试,但我完全不知道该使用哪种方法。 我阅读了有关 TestSupportBinder 的信息,但我认为这不适用于我的情况,因此我使用 KafkaEmbedded 方法。这就是我初始化嵌入式 Kafka 的方式。

 @Before
public void setUp() throws Exception{

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

DefaultKafkaConsumerFactory<Object, LoggerMessage> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TOPIC);
}

我要测试的内容如下:

public interface Channels {
  String LOGGER_IN_STREAM = "logger-topic-in-stream";
  String LOGGER_IN = "logger-topic-in";
  String LOGGERDATAVALIDATED_OUT = "loggerDataValidated-topic-out";

  @Input(Channels.LOGGER_IN)
  SubscribableChannel processMessage();

  @Input(Channels.LOGGER_IN_STREAM)
  KStream<Object, LoggerMessage> loggerKstreamIn();

  @Output(Channels.LOGGERDATAVALIDATED_OUT)
  MessageChannel validateLoggerData();
}

我收到以下错误消息

org.springframework.beans.factory.BeanCreationException:创建名为“some.domain.Channels”的bean时出错:调用init方法失败;嵌套异常是 java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream 在注册工厂中:channelFactory,messageSourceFactory

原因:java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream 在注册工厂中:channelFactory,messageSourceFactory

我做错了什么?

【问题讨论】:

  • Kafka Streams binder 在类路径上吗?如果你可以在 Github 上分享一个小示例项目,我们可以看看。
  • 是的 kafka Streams binder 位于 2.0.1.Release 版本的类路径中。总的来说,我使用的是 springbootVersion 2.0.6。我不能分享一个示例项目,我将不得不改变几乎一切。我可以开始寻找更多线索吗?

标签: spring junit apache-kafka apache-kafka-streams spring-cloud-stream


【解决方案1】:

我错过了将我的 Channels 接口作为 MockBean 注入。在我这样做之后,一切都按预期工作。

【讨论】:

    猜你喜欢
    • 2019-07-30
    • 2017-08-30
    • 2018-11-10
    • 1970-01-01
    • 1970-01-01
    • 2012-01-08
    • 1970-01-01
    • 1970-01-01
    • 2018-07-06
    相关资源
    最近更新 更多