【问题标题】:Embedded kafka throws exception when producing within multiple scalatest suits在多个 scalatest 套装中生产时,嵌入式 kafka 引发异常
【发布时间】:2016-07-12 18:28:17
【问题描述】:

这是我的测试套装的配置方式。

  "test payments" should {
    "Add 100 credits" in {
      runTeamTest { team =>
        withRunningKafka {
          val addCreditsRequest = AddCreditsRequest(team.id.stringify, member1Email, 100)
          TestCommon.makeRequestAndCheck(
            member1Email,
            TeamApiGenerated.addCredits().url,
            Helpers.POST,
            Json.toJson(addCreditsRequest),
            OK
          )

          val foundTeam = TestCommon.waitForFuture(TeamDao.findOneById(team.id))
          foundTeam.get.credits mustEqual initialCreditAmount + 100
        }
      }
    }

    "deduct 100 credits" in {
      runTeamTest { team =>
        withRunningKafka {
          val deductCreditsRequest = DeductCreditsRequest(team.id.stringify, member1Email, 100)
          TestCommon.makeRequestAndCheck(
            member1Email,
            TeamApiGenerated.deductCredits().url,
            Helpers.POST,
            Json.toJson(deductCreditsRequest),
            OK
          )

          val foundTeam = TestCommon.waitForFuture(TeamDao.findOneById(team.id))
          foundTeam.get.credits mustEqual initialCreditAmount - 100
        }
      }
    }

在 Scalatest 中,总体套装名称是 "test payments",其中的后续测试在运行第一个测试后会出现问题。如果我分别运行这两个测试中的每一个,它们都会成功,但如果我运行整个套装,第一个成功,第二个返回 org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. 异常。上面的代码没有显示正在测试的控制器中的代码,但是在控制器中,我有一个不断轮询的 kafka 消费者,并且在测试中没有调用 close()

【问题讨论】:

    标签: apache-kafka scalatest


    【解决方案1】:

    我建议您在 beforeAllafterAll 部分中使用伴随对象方法 EmbeddedKafka.start()EmbeddedKafka.stop()。这样,您还可以避免为单个测试类再次停止/启动 Kafka。

    还要确保您没有尝试在同一端口上同时启动 2 个或更多 Kafka 实例。

    【讨论】:

    • 有没有办法检查通过停止命令的 Kafka 集群是否真的死了。 @manub
    • 您可以尝试检查是否有 TCP 服务器正在侦听 Kafka 端口。如果您检查源代码中的测试,这就是验证服务器宕机的方式
    • 嗨@manub,我知道它已经3年了,但你能不能也请评论[这里](stackoverflow.com/questions/50389776/…)?谢谢一百万。
    • 实际上,我的 spark 应用程序在本地存储检查点,因此重新运行测试不会在 EmbeddedKafka 上读取相同的传入消息,因此会出错。因此,我通过在每次运行测试套件时删除检查点目录来解决它。如果您有类似的设置,删除检查点目录可能会有所帮助。
    猜你喜欢
    • 2020-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-23
    • 1970-01-01
    • 2012-09-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多