【问题标题】:testing kafka and spark with testcontainers使用 testcontainers 测试 kafka 和 spark
【发布时间】:2021-10-25 02:18:01
【问题描述】:

我正在尝试与 testcontainers 一起检查流式管道作为集成测试,但我不知道如何获取 bootstrapServers,至少在最后一个 testcontainers 版本中并在那里创建特定主题。如何使用 'containerDef' 提取引导服务器并添加主题?

import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession

class Mykafkatest extends FunSuite with TestContainerForAll {
  //val kafkaContainer: KafkaContainer      = KafkaContainer("confluentinc/cp-kafka:5.4.3")
  override val containerDef: ContainerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    val sparkSession: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Unit testing")
      .getOrCreate()

    // How add a topic in that container?

    // This is not posible:
    val servers=container.bootstrapServers

    val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("subscribe", "topic1")
      .load()


    df.show(false)

  })

}

我的 sbt 配置:

lazy val root = project
  .in(file("./pipeline"))
  .settings(
    organization := "org.example",
    name := "spark-stream",
    version := "0.1",
    scalaVersion := "2.12.10",
    libraryDependencies := Seq(
      "org.apache.spark" %% "spark-sql-kafka-0-10"       % "3.0.3"  % Compile,
      "org.apache.spark" %% "spark-sql"                  % "3.0.3"  % Compile,
      "com.dimafeng"     %% "testcontainers-scala-munit" % "0.39.5" % Test,
      "org.dimafeng"     %% "testcontainers-scala-kafka" % "0.39.5" % Test,
      "org.scalameta"    %% "munit"                      % "0.7.28" % Test
    ),
    testFrameworks += new TestFramework("munit.Framework"),
    Test / fork := true
  )

文档未显示完整示例:https://www.testcontainers.org/modules/kafka/

【问题讨论】:

  • 这个问题是否与 MuleSoft 的应用程序测试框架 MUnit (docs.mulesoft.com/munit/latest) 相关,用于为您的 Mule 应用程序构建自动化测试?
  • container.getBootstrapServers() 不起作用?
  • 不是,不是
  • 如果没有这个测试用例,理想情况下你会如何“在那个容器中添加一个主题?”就像流程的工作原理一样,contianer 对象上有哪些可用的方法?

标签: scala testcontainers


【解决方案1】:

这里唯一的问题是您将 KafkaContainer.Def 显式转换为 ContainerDef

withContianersContainter提供的容器类型由path dependent type在提供的ContainerDef决定,

trait TestContainerForAll extends TestContainersForAll { self: Suite =>

  val containerDef: ContainerDef

  final override type Containers = containerDef.Container

  override def startContainers(): containerDef.Container = {
    containerDef.start()
  }

  // inherited from TestContainersSuite
  def withContainers[A](runTest: Containers => A): A = {
    val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
    runTest(c)
  }

}
trait ContainerDef {

  type Container <: Startable with Stoppable

  protected def createContainer(): Container

  def start(): Container = {
    val container = createContainer()
    container.start()
    container
  }
}

当您在 override val containerDef: ContainerDef = KafkaContainer.Def() 中明确指定类型 ContainerDef 时,这会破坏整个“类型诡计”,因此 Scala 编译器会留下 type Container &lt;: Startable with Stoppable 而不是 KafkaContainer

因此,只需删除转换为 ContainerDef 的显式类型,val servers = container.bootstrapServers 就会按预期工作。

import com.dimafeng.testcontainers.KafkaContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import munit.FunSuite

class Mykafkatest extends FunSuite with TestContainerForAll {
  override val containerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    //...

    val servers = container.bootstrapServers

    println(servers)

    //...
  })
}

【讨论】:

  • 你能提供更多细节吗?此代码无法编译
  • 您的意思是答案中的Mykafkatest 代码吗?你得到什么编译错误?
  • 添加了正确的导入。请注意,我们需要导入 com.dimafeng.testcontainers.KafkaContainer 而不是 org.testcontainers.containers.KafkaContainer
  • 现在找不到了:repo1.maven.org/maven2/org/dimafeng/… 真的在这个 url repo1.maven.org/maven2/org 不存在 'dimafeng' 文件夹
  • 你为什么要在那个 url 上寻找那个文件夹? "com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5" 可在 maven central - mvnrepository.com/artifact/com.dimafeng/… 上找到。 sbt 将自动下载并将其添加到您的项目中。你的 sbt 项目有什么问题吗?
猜你喜欢
  • 2020-10-19
  • 2021-02-10
  • 2019-09-19
  • 1970-01-01
  • 2021-05-16
  • 2021-07-14
  • 2017-12-19
  • 2019-09-06
  • 1970-01-01
相关资源
最近更新 更多