【问题标题】:Scala Unit testing for ProcessAllWindowFunctionProcessAllWindowFunction 的 Scala 单元测试
【发布时间】:2019-10-31 17:22:43
【问题描述】:

阅读官方flink测试文档后(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html) 我能够使用测试工具为 ProcessFunction 开发测试,如下所示:

pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")

testHarness =
  new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
    new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
  )

testHarness.open()

现在,我正在尝试对 ProcessAllWindowFunction 执行相同的操作,如下所示:

class MapVersionValidationDistributor(batchSize: Int) extends
  ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {

lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)

首先我意识到我不能对 ProcessAllWindowFunction 使用 TestHarness,因为它没有 processElement 方法。在这种情况下,我应该遵循什么单元测试策略?

编辑:目前我的测试代码如下所示:

val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
    val mvv = new MapVersionValidationDistributor(1)
    val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))

    val ctx = mock[mvv.Context]
    val streamContext =  mock[RuntimeContext]
    mvv.setRuntimeContext(streamContext)

    mvv.open(mock[Configuration])
    mvv.process(ctx,input3,collector)

我收到了这个错误:

 Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }

【问题讨论】:

  • 您只是想在某个窗口上测试您的处理方法的行为吗??
  • 是的,这基本上就是我想要做的 :) 感谢您的回复顺便说一句

标签: scala apache-flink flink-streaming


【解决方案1】:

您真的不需要测试工具来对ProcessAllWindowFunctionprocess 方法进行单元测试。 process 函数接受 3 个参数:ContextIterable[IN]Collector[OUT]。您可以使用一些库,具体取决于用于模拟 Context 的语言。您还可以根据您在此处的偏好轻松实现或模拟Collector。而 Iterable[IN] 只是一个 Iterable 包含您的窗口的元素,它会在窗口被触发后传递给函数。

【讨论】:

  • 我做了你说的事情,我这样调用函数:mvv.process(ctx, Seq(input,input2,input3), collector ) 使用 ctx 和收集器是模拟的,问题是当我尝试运行代码时出现此错误:运行时上下文尚未初始化。 java.lang.IllegalStateException: 运行时上下文尚未初始化。
  • RuntimeContextnull 时会引发此异常。我想你也应该嘲笑这个。您是否在您的函数中调用 getRuntimeContext() ??。创建函数的新实例后,您可以使用setRuntimeContext 设置运行时上下文。
猜你喜欢
  • 2010-10-18
  • 2010-10-03
  • 2012-02-06
  • 1970-01-01
  • 2021-09-18
  • 1970-01-01
  • 2020-05-03
  • 2018-05-08
  • 1970-01-01
相关资源
最近更新 更多