【问题标题】:How to prepare tests using testcontainers and spring-kafka如何使用 testcontainers 和 spring-kafka 准备测试
【发布时间】:2020-10-19 10:14:04
【问题描述】:

我正在尝试为 kafka 消息传递设置我的集成测试,并从使用 Embedded-Kafka 转为使用 Testcontainers。给定以下 docker-compose 配置和所有集成测试的基类:

kafka-compose.yaml:

version: '3.3'

services:
  zookeeper:
    image: "wurstmeister/zookeeper"
  kafka:
    image: "wurstmeister/kafka:2.12-2.2.2"
    ports:
      - "9092:9092"
    depends_on:
      - "zookeeper"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_HOST_NAME: "${KAFKA_HOST:-localhost}"
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_CREATE_TOPICS: "recoverer-test:1:1,some-topic"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
@SpringBootTest
@Slf4j
public class IntegrationTest {
  private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();

  protected static DockerComposeContainer initializeKafkaContainer() {
    log.info(
        "Initializing kafka container. Should be called only once. Current value of the kafkaContainer: {}",
        kafkaContainer);
    try {
      var kafkaContainer =
          new DockerComposeContainer(new File("src/test/resources/kafka-compose.yml"))
              .withExposedService("kafka_1", 9092);
      kafkaContainer.start();

      var bootstrapServers =
          format(
              "PLAINTEXT://%s:%s",
              kafkaContainer.getServiceHost("kafka_1", 9092),
              kafkaContainer.getServicePort("kafka_1", 9092));

      System.setProperty("spring.embedded.kafka.brokers", bootstrapServers);

      return kafkaContainer;
    } catch (Throwable t) {
      log.error("Can't initialize the Kafka test container.", t);
      throw t;
    }
  }
  
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest {

  private static final String GROUP_ID = "test-group-id";
  private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
  private static final Instant RECEIVED_AT = now();
  private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);

  @Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;

  @Autowired private ConsumerFactory<String, String> consumerFactory;

  @Autowired private KafkaTemplate<Object, Object> kafkaTemplate;

  @MockBean private ActivateSomethingActivities activateCampaignActivities;

  private Consumer<String, String> consumer;

  private long initiallyCommittedOffset;

  @BeforeEach
  void startKafkaListener() {
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
  }

  @AfterEach
  void stopKafkaListener() {
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
  }

  @Test
  void shouldPerformSomething() {
...
  }

我遇到的问题很少:

  1. 似乎 spring-kafka 和它的 @KafkaListeners 在使用 @SpringBootTest 注释的所有可能测试期间都处于活动状态,而不仅仅是在特定于 kafka 的测试期间。这意味着发送到 kafka 主题的消息可以被任意测试使用。首先是 spring-kafka 家伙的问题:是否可以将 spring-kafka-test 与 Testcontainers 一起使用?是否有可能停止每个测试的所有 @KafkaListener 并为特定的 @SpringBootTest 测试显式启用它们?
  2. Testcontainers 自带一个 Kafka 模块。这个使用了 confluent kafka docker 镜像,它在配置方面非常顽固。例如,您不能设置一些代理属性,也不能告诉容器在启动后应该创建哪个主题。在与这个模块苦苦挣扎之后,我决定将 docker-compose 模块与wurstmeister/kafka 图像一起使用。后一种方法的问题是,当我使用命令行 maven 运行测试时,我收到一条错误消息,告诉我 kafka 已经在 9092 端口上运行。似乎 maven 在 mvn test 期间启动了很少的 JVM,因此静态字段 kafkaContainer 被初始化了几次。为什么会这样?

【问题讨论】:

    标签: java spring-kafka spring-boot-test testcontainers


    【解决方案1】:
    1. 在类上使用 @DirtiesContext 在测试完成时关闭侦听器。

    不知道#2。

    【讨论】:

    • 当我添加@DirtiesContext(classMode = ClassMode.BEFORE_CLASS) 时似乎没有任何效果。我更新了我的初始帖子,请参阅测试类。也许那里有问题。
    【解决方案2】:

    第一个可以通过对不同的测试使用不同的上下文来克服

    例子:

    @ExtendWith(SpringExtension.class)
    @DataJpaTest
    @AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
    @TestExecutionListeners({DependencyInjectionTestExecutionListener.class,
            FlywayTestExecutionListener.class})
    @FlywayTest
    @ActiveProfiles({"test"})
    public abstract class AbstractDatabaseTest {
    

    这是我为集成测试创建的测试,我会在需要数据库层的地方扩展此测试,您可以尝试为 Kafka 测试创建一个类似的测试。
    一般来说,@SpringBootTest 会加速整个应用程序,这可能需要更长时间,我个人不喜欢它。

    第二很难说。您可以尝试在initializeKafkaContainer() 中打印堆栈跟踪,以查看执行该操作的测试。或者,如果您尝试使用以前的方法,您可以在抽象类的静态块中进行初始化,然后扩展它的每个测试都将使用现有的静态容器,并且只会初始化一次。

    【讨论】:

      猜你喜欢
      • 2021-10-25
      • 2021-02-10
      • 2021-08-26
      • 2019-04-05
      • 1970-01-01
      • 2019-04-22
      • 2021-05-04
      • 1970-01-01
      • 2019-05-30
      相关资源
      最近更新 更多