【问题标题】:How to write unit tests in Spark 2.0+?如何在 Spark 2.0+ 中编写单元测试?
【发布时间】:2017-09-29 11:14:04
【问题描述】:

我一直在尝试寻找一种合理的方法来使用 JUnit 测试框架测试 SparkSession。虽然SparkContext 似乎有很好的示例,但我无法弄清楚如何获得适用于SparkSession 的相应示例,即使它在spark-testing-base 内部的多个地方使用。我很乐意尝试不使用 spark-testing-base 的解决方案,如果它不是真正正确的方法。

简单测试用例(complete MWE projectbuild.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

使用 JUnit 运行它的结果是在负载线上出现 NPE:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

请注意,正在加载的文件是否存在并不重要;在正确配置的 SparkSession 中,more sensible error will be thrown

【问题讨论】:

  • 感谢大家到目前为止的回复;我希望尽快审查。我还提出了一个问题并在这里交叉引用它:github.com/holdenk/spark-testing-base/issues/180
  • 不幸的是,我还没有开始实际使用 Spark ......总有一天,也许 3.x 以这种速度 - 否则我会努力接受答案。很高兴这对其他人有用。

标签: scala unit-testing apache-spark junit apache-spark-sql


【解决方案1】:

感谢您提出这个悬而未决的问题。出于某种原因,当谈到 Spark 时,每个人都沉迷于分析,以至于忘记了过去 15 年左右出现的伟大的软件工程实践。这就是为什么我们在课程中重点讨论测试和持续集成(以及 DevOps 等其他内容)。

术语简介

true 单元测试意味着您可以完全控制测试中的每个组件。不能与数据库、REST 调用、文件系统甚至系统时钟进行交互;正如 Gerard Mezaros 在xUnit Test Patterns 中所说的那样,一切都必须“加倍”(例如模拟、存根等)。我知道这看起来像是语义,但它确实很重要。未能理解这一点是您在持续集成中看到间歇性测试失败的主要原因之一。

我们仍然可以进行单元测试

因此,鉴于这种理解,对RDD 进行单元测试是不可能的。但是,在开发分析时仍然有单元测试的地方。

考虑一个简单的操作:

rdd.map(foo).map(bar)

这里的foobar 是简单的函数。这些可以以正常方式进行单元测试,并且它们应该包含尽可能多的极端案例。毕竟,他们为什么关心他们从哪里获得输入,无论是测试夹具还是RDD

别忘了 Spark Shell

这不是测试本身,但在这些早期阶段,您还应该在 Spark shell 中进行试验,以找出您的转换,尤其是您的方法的后果。例如,您可以使用toDebugStringexplainglomshowprintSchema 等许多不同的函数检查物理和逻辑查询计划、分区策略和保存以及数据状态在。我会让你探索这些。

您还可以在 Spark shell 和测试中将您的 master 设置为 local[2],以识别只有在您开始分发工作时才可能出现的任何问题。

使用 Spark 进行集成测试

现在是有趣的东西。

为了在您对辅助函数和RDD/DataFrame 转换逻辑的质量有信心之后进行集成测试 Spark,做一些事情是至关重要的(无论构建工具和测试框架):

  • 增加 JVM 内存。
  • 启用分叉但禁用并行执行。
  • 使用您的测试框架将您的 Spark 集成测试累积到套件中,并在所有测试之前初始化 SparkContext,并在所有测试之后停止它。

使用 ScalaTest,您可以混合使用 BeforeAndAfterAll(我通常更喜欢)或 BeforeAndAfterEach,正如 @ShankarKoirala 所做的那样来初始化和拆除 Spark 工件。我知道这是一个合理的例外情况,但我真的不喜欢你必须使用的那些可变的vars。

贷款模式

另一种方法是使用Loan Pattern

例如(使用 ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

如您所见,贷款模式利用高阶函数将SparkContext“贷款”给测试,然后在完成后将其丢弃。

面向痛苦的编程(谢谢,Nathan)

这完全是一个偏好问题,但在引入另一个框架之前,我更喜欢使用贷款模式并尽可能地自行连接。除了试图保持轻量级之外,框架有时会添加很多“魔法”,使调试测试失败难以推理。所以我采用Suffering-Oriented Programming 方法——在这种方法中,我避免添加新框架,直到没有它的痛苦无法承受。但同样,这取决于您。

正如@ShankarKoirala 所提到的,该替代框架的最佳选择当然是spark-testing-base。在这种情况下,上面的测试将如下所示:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

请注意,我不需要做任何事情来处理SparkContextSharedSparkContext 给了我所有这些——sc 作为SparkContext——免费。就个人而言,虽然我不会仅仅为了这个目的而引入这种依赖关系,因为贷款模式正是我所需要的。此外,由于分布式系统发生了如此多的不可预测性,当持续集成中出现问题时,必须追溯第三方库源代码中发生的魔法可能会非常痛苦。

现在,spark-testing-base 真正闪耀的地方在于基于 Hadoop 的帮助程序,例如 HDFSClusterLikeYARNClusterLike。将这些特征混合在一起确实可以为您节省很多设置痛苦。它的另一个亮点是 Scalacheck 类似的属性和生成器——当然假设您了解基于属性的测试是如何工作的以及它为什么有用。但同样,我个人会推迟使用它,直到我的分析和测试达到那种复杂程度。

“只有西斯在做绝对的事情。” ——欧比旺·克诺比

当然,您也不必二选一。也许您可以将贷款模式方法用于大多数测试,spark-testing-base 仅用于少数更严格的测试。选择不是二元的。两者都可以。

使用 Spark Streaming 进行集成测试

最后,我想展示一个带有内存值的 SparkStreaming 集成测试设置在没有 spark-testing-base 的情况下会是什么样子的 sn-p:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

这比看起来简单。它实际上只是将数据序列转换为队列以提供给DStream。其中大部分实际上只是适用于 Spark API 的样板设置。无论如何,您可以将其与 StreamingSuiteBase as found in spark-testing-base 进行比较,以决定您更喜欢哪个。

这可能是我有史以来最长的帖子,所以我将把它留在这里。我希望其他人能加入其他想法,通过改进所有其他应用程序开发的相同敏捷软件工程实践来帮助提高我们的分析质量。

对于无耻的插件表示歉意,您可以查看我们的课程Analytics with Apache Spark,我们在其中讨论了很多这些想法等等。我们希望尽快有一个在线版本。

【讨论】:

  • 感谢您提供详细的答案,但使用贷款模式将使您为定义的每个测试用例启动和停止 spark 上下文,而避免这种情况的唯一方法是使用 koiralo 提供的答案?
  • 这应该被选为问题的正确答案。
【解决方案2】:

您可以使用 FunSuite 和 BeforeAndAfterEach 编写一个简单的测试,如下所示

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

你不需要在测试中创建一个函数,你可以简单地写成

test ("test name") {//implementation and assert}

Holden Karau 写了非常好的测试 spark-testing-base

你需要看看下面是一个简单的例子

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

希望这会有所帮助!

【讨论】:

  • 很好的答案。 spark-spec 使用了类似的方法,但是在项目中添加了很多测试文件时速度太慢了。请参阅我的答案以了解不会强制 SparkSession 在每个测试文件之后停止/启动的替代实现。
  • 我也喜欢这个答案的第一部分;我只希望第二个示例中包含 Spark 的东西,而不是玩具断言。除此之外,我要指出的是,在一组测试之前和/或之后执行昂贵的副作用并不是一个新想法。正如我在回答中所建议的那样,ScalaTest 有充足的机制——在这种情况下用于管理 Spark 工件——您可以像使用任何其他昂贵的装置一样使用这些机制。至少在值得引入更重的第三方框架之前是值得的。
  • 附带说明,ScalaTest 和 specs2(我认为默认情况下这样做)都可以并行运行测试以提高速度。构建工具也可以提供帮助。但同样,这些都不是新的。
  • 我已根据您的建议为 spark-testing-base 编辑了适当的测试示例。谢谢,
【解决方案3】:

Spark 1.6 开始,您可以使用 Spark 用于其自己的单元测试的 SharedSparkContextSharedSQLContext

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

由于 Spark 2.3 SharedSparkSession 可用:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

更新:

Maven 依赖:

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

SBT 依赖:

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

此外,您可以查看 Spark 的 test sources,那里有大量的各种测试套件。

更新 2:

Apache Spark Unit Testing Part 1 — Core Components

Apache Spark Unit Testing Part 2 — Spark SQL

Apache Spark Unit Testing Part 3 — Streaming

Apache Spark Integration Testing

【讨论】:

  • 你知道哪个maven包包含这个类吗?
  • 当然。两者都在"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"
  • 对于 Maven &lt;dependency&gt; &lt;groupId&gt;org.apache.spark&lt;/groupId&gt; &lt;artifactId&gt;spark-sql&lt;/artifactId&gt; &lt;version&gt;SPARK_VERSION&lt;/version&gt; &lt;type&gt;test-jar&lt;/type&gt; &lt;scope&gt;test&lt;/scope&gt; &lt;/dependency&gt;
  • 对我来说,还需要使用libraryDependencies += "org.apache.spark" %% "spark-core" % SPARK_VERSION withSources()libraryDependencies += "org.apache.spark" %% "spark-catalyst" % SPARK_VERSION withSources() 添加 spark-core 和 spark-catalyst 的 sources
  • 晚安,黄! “无法解析符号测试”是什么意思?发生在哪里?
【解决方案4】:

我喜欢创建一个SparkSessionTestWrapper 特征,它可以混合到测试类中。 Shankar 的方法很有效,但对于包含多个文件的测试套件来说,速度太慢了。

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

特征可以如下使用:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

查看spark-spec 项目以获取使用SparkSessionTestWrapper 方法的真实示例。

更新

spark-testing-base library 会在某些特征混入测试类时自动添加 SparkSession(例如,当混入 DataFrameSuiteBase 时,您将可以通过 spark 变量访问 SparkSession)。

我创建了一个名为 spark-fast-tests 的单独测试库,以便用户在运行测试时完全控制 SparkSession。我不认为测试助手库应该设置 SparkSession。用户应该能够以他们认为合适的方式启动和停止他们的 SparkSession(我喜欢创建一个 SparkSession 并在整个测试套件运行过程中使用它)。

以下是 spark-fast-tests assertSmallDatasetEquality 方法的实际操作示例:

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}

【讨论】:

  • 在这种方法中,您如何建议在某处添加sparkSession.stop()
  • 您不需要sparkSession.stop()@NeilBest。测试套件完成运行后,Spark Session 将关闭。
  • 为什么不需要 sparkSession.stop()? @Shankar Koirala 的回答停止了 sparkSession,这没用吗?
  • @yuxh - Shankar 的回答会在每次测试后启动和停止 Spark 会话。这种方法有效,但速度很慢,因为启动 Spark 会话需要一段时间。
  • 但他也提到 spark-testing-base , SharedSparkContext 在所有测试用例之后停止这个上下文。即使在 SparkSessionTestWrapper 中的所有测试用例之后,我也没有看到任何代码停止
【解决方案5】:

我可以用下面的代码解决问题

在项目pom中添加spark-hive依赖

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

【讨论】:

    【解决方案6】:

    另一种使用 JUnit 进行单元测试的方法

    import org.apache.spark.sql.SparkSession
    import org.junit.Assert._
    import org.junit.{After, Before, _}
    
    @Test
    class SessionSparkTest {
      var spark: SparkSession = _
    
      @Before
      def beforeFunction(): Unit = {
        //spark = SessionSpark.getSparkSession()
        spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
        System.out.println("Before Function")
      }
    
      @After
      def afterFunction(): Unit = {
        spark.stop()
        System.out.println("After Function")
      }
    
      @Test
      def testRddCount() = {
        val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
        val count = rdd.count()
        assertTrue(3 == count)
      }
    
      @Test
      def testDfNotEmpty() = {
        val sqlContext = spark.sqlContext
        import sqlContext.implicits._
        val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
        assertFalse(numDf.head(1).isEmpty)
      }
    
      @Test
      def testDfEmpty() = {
        val sqlContext = spark.sqlContext
        import sqlContext.implicits._
        val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
        assertTrue(emptyDf.head(1).isEmpty)
      }
    }
    
    case class Num(id: Int)
    

    【讨论】:

      猜你喜欢
      • 2012-02-03
      • 2019-01-28
      • 2013-06-12
      • 1970-01-01
      • 1970-01-01
      • 2022-01-24
      • 2012-12-15
      • 1970-01-01
      相关资源
      最近更新 更多