【发布时间】:2021-01-06 04:06:11
【问题描述】:
我正在为 Kafka 开发一个测试库,Kafkaesque。该库允许您使用流畅且优雅的(?!)API 为 Kafka 开发集成测试。目前,我为 Spring Kafka 开发版本。
库需要在每次测试中初始化:
@Test
void consumeShouldConsumeMessagesProducesFromOutsideProducer() {
kafkaTemplate.sendDefault(1, "data1");
kafkaTemplate.sendDefault(2, "data2");
new SpringKafkaesque(broker)
.<Integer, String>consume()
.fromTopic(CONSUMER_TEST_TOPIC)
.waitingAtMost(1L, TimeUnit.SECONDS)
.waitingEmptyPolls(5, 100L, TimeUnit.MILLISECONDS)
.withDeserializers(new IntegerDeserializer(), new StringDeserializer())
.expecting()
.havingRecordsSize(2)
.assertingThatPayloads(Matchers.containsInAnyOrder("data1", "data2"))
.andCloseConsumer();
}
我不想手动初始化SpringKafkaesque 对象,而是想创建一个对我来说具有魔力的注释。类似于 Spring Kafka 的 @EmbeddedKafka 注解。
@SpringBootTest(classes = {TestConfiguration.class})
@Kafkaesque(
topics = {SpringKafkaesqueTest.CONSUMER_TEST_TOPIC, SpringKafkaesqueTest.PRODUCER_TEST_TOPIC})
class SpringKafkaesqueTest {
@Autowired
private Kafkaesque kafkaesque;
@Test
void consumeShouldConsumeMessagesProducesFromOutsideProducer() {
kafkaTemplate.sendDefault(1, "data1");
kafkaTemplate.sendDefault(2, "data2");
kafkaesque
.<Integer, String>consume()
.fromTopic(CONSUMER_TEST_TOPIC)
.waitingAtMost(1L, TimeUnit.SECONDS)
.waitingEmptyPolls(5, 100L, TimeUnit.MILLISECONDS)
.withDeserializers(new IntegerDeserializer(), new StringDeserializer())
.expecting()
.havingRecordsSize(2)
.assertingThatPayloads(Matchers.containsInAnyOrder("data1", "data2"))
.andCloseConsumer();
}
有可能吗?有什么建议吗?
【问题讨论】:
-
我想知道如果在测试配置中使用
@KafkaListener有一种简单的方法,为什么要这样做...... -
嗨,@ArtemBilan。感谢您的评论。您为 Spring Kafka 提供的库的问题在于它是完全异步的。每次有人想从某个主题消费某些东西时,他必须编写代码来等待该主题中的消息可用,然后将它们放入队列中,等等。或者,至少,这是我对 Spring Kafka 的体验。如果您有更好的想法,请告诉我:)
标签: java junit apache-kafka annotations spring-kafka