【发布时间】:2019-10-31 17:22:43
【问题描述】:
阅读官方flink测试文档后(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html) 我能够使用测试工具为 ProcessFunction 开发测试,如下所示:
pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")
testHarness =
new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
)
testHarness.open()
现在,我正在尝试对 ProcessAllWindowFunction 执行相同的操作,如下所示:
class MapVersionValidationDistributor(batchSize: Int) extends
ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {
lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)
首先我意识到我不能对 ProcessAllWindowFunction 使用 TestHarness,因为它没有 processElement 方法。在这种情况下,我应该遵循什么单元测试策略?
编辑:目前我的测试代码如下所示:
val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
val mvv = new MapVersionValidationDistributor(1)
val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))
val ctx = mock[mvv.Context]
val streamContext = mock[RuntimeContext]
mvv.setRuntimeContext(streamContext)
mvv.open(mock[Configuration])
mvv.process(ctx,input3,collector)
我收到了这个错误:
Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }
【问题讨论】:
-
您只是想在某个窗口上测试您的处理方法的行为吗??
-
是的,这基本上就是我想要做的 :) 感谢您的回复顺便说一句
标签: scala apache-flink flink-streaming