【发布时间】:2017-11-10 11:44:48
【问题描述】:
我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09 和 FlinkKafkaConsumer09 从同一 kafka 主题读取数据。我正在通过产品中的测试数据:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
并检查来自消费者的数据是否与:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
使用TestListResultSink。
通过打印流,我能够按预期看到来自消费者的数据。但无法获得 Junit 测试结果,因为即使在消息完成后消费者仍会继续运行。所以它没有来测试部分。
Flink 或 FlinkKafkaConsumer09 中是否有任何方法可以停止进程或运行特定时间?
【问题讨论】:
标签: junit apache-kafka apache-flink flink-streaming