【发布时间】:2020-10-19 10:14:04
【问题描述】:
我正在尝试为 kafka 消息传递设置我的集成测试,并从使用 Embedded-Kafka 转为使用 Testcontainers。给定以下 docker-compose 配置和所有集成测试的基类:
kafka-compose.yaml:
version: '3.3'
services:
zookeeper:
image: "wurstmeister/zookeeper"
kafka:
image: "wurstmeister/kafka:2.12-2.2.2"
ports:
- "9092:9092"
depends_on:
- "zookeeper"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: "${KAFKA_HOST:-localhost}"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_CREATE_TOPICS: "recoverer-test:1:1,some-topic"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
@SpringBootTest
@Slf4j
public class IntegrationTest {
private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();
protected static DockerComposeContainer initializeKafkaContainer() {
log.info(
"Initializing kafka container. Should be called only once. Current value of the kafkaContainer: {}",
kafkaContainer);
try {
var kafkaContainer =
new DockerComposeContainer(new File("src/test/resources/kafka-compose.yml"))
.withExposedService("kafka_1", 9092);
kafkaContainer.start();
var bootstrapServers =
format(
"PLAINTEXT://%s:%s",
kafkaContainer.getServiceHost("kafka_1", 9092),
kafkaContainer.getServicePort("kafka_1", 9092));
System.setProperty("spring.embedded.kafka.brokers", bootstrapServers);
return kafkaContainer;
} catch (Throwable t) {
log.error("Can't initialize the Kafka test container.", t);
throw t;
}
}
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest {
private static final String GROUP_ID = "test-group-id";
private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
private static final Instant RECEIVED_AT = now();
private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);
@Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;
@Autowired private ConsumerFactory<String, String> consumerFactory;
@Autowired private KafkaTemplate<Object, Object> kafkaTemplate;
@MockBean private ActivateSomethingActivities activateCampaignActivities;
private Consumer<String, String> consumer;
private long initiallyCommittedOffset;
@BeforeEach
void startKafkaListener() {
kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
}
@AfterEach
void stopKafkaListener() {
kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
}
@Test
void shouldPerformSomething() {
...
}
我遇到的问题很少:
- 似乎 spring-kafka 和它的
@KafkaListeners 在使用 @SpringBootTest 注释的所有可能测试期间都处于活动状态,而不仅仅是在特定于 kafka 的测试期间。这意味着发送到 kafka 主题的消息可以被任意测试使用。首先是 spring-kafka 家伙的问题:是否可以将 spring-kafka-test 与 Testcontainers 一起使用?是否有可能停止每个测试的所有@KafkaListener并为特定的@SpringBootTest测试显式启用它们? - Testcontainers 自带一个 Kafka 模块。这个使用了 confluent kafka docker 镜像,它在配置方面非常顽固。例如,您不能设置一些代理属性,也不能告诉容器在启动后应该创建哪个主题。在与这个模块苦苦挣扎之后,我决定将 docker-compose 模块与
wurstmeister/kafka图像一起使用。后一种方法的问题是,当我使用命令行 maven 运行测试时,我收到一条错误消息,告诉我 kafka 已经在 9092 端口上运行。似乎 maven 在mvn test期间启动了很少的 JVM,因此静态字段kafkaContainer被初始化了几次。为什么会这样?
【问题讨论】:
标签: java spring-kafka spring-boot-test testcontainers