【问题标题】:How to stop a flink streaming job from program如何从程序中停止 flink 流作业
【发布时间】:2017-11-10 11:44:48
【问题描述】:

我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09FlinkKafkaConsumer09 从同一 kafka 主题读取数据。我正在通过产品中的测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

并检查来自消费者的数据是否与:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

使用TestListResultSink

通过打印流,我能够按预期看到来自消费者的数据。但无法获得 Junit 测试结果,因为即使在消息完成后消费者仍会继续运行。所以它没有来测试部分。

FlinkFlinkKafkaConsumer09 中是否有任何方法可以停止进程或运行特定时间?

【问题讨论】:

    标签: junit apache-kafka apache-flink flink-streaming


    【解决方案1】:

    潜在的问题是流媒体程序通常不是有限的并且可以无限期地运行。

    至少目前最好的方法是在您的流中插入一个特殊的控制消息,让源正确终止(只需通过离开读取循环来停止读取更多数据)。这样 Flink 会告诉所有下游操作者,他们可以在消耗完所有数据后停止。

    或者,您可以在源中抛出一个特殊异常(例如,在一段时间后),这样您就可以区分“正确”终止与失败情况(通过检查错误原因)。在源中抛出异常将使程序失败。

    【讨论】:

    • 您好@TillRohrmann,感谢您的回复。在处理完所有 3 个元素后,我尝试在 map 函数中抛出一些异常。但是在我不想要的那种情况下,JUnit 测试显示为失败。如果你能给我举个例子,那就太好了。非常感谢!
    • 我设法让我的 Flink 作业在单元测试中停止,通过以下方法:1)在代码中添加一个仅测试标志,2)在图表的某个阶段,@987654321 @然后抛出一个已知消息的异常,3)try { env.execute(); } catch ...并检查异常消息,如果是已知消息,吞下异常;否则再次抛出异常。
    【解决方案2】:

    关注@TillRohrman

    如果您使用 EmbeddedKafka 实例,您可以结合特殊异常方法并在单元测试中处理它,然后读取 EmbeddedKafka 主题并断言消费者值。

    我发现https://github.com/asmaier/mini-kafka/blob/master/src/test/java/de/am/KafkaProducerIT.java 在这方面非常有用。

    唯一的问题是您将丢失触发异常的元素,但您始终可以调整测试数据以解决此问题。

    【讨论】:

      【解决方案3】:

      你不能在反序列化器中使用 isEndOfStream 覆盖来停止从 Kafka 获取吗?如果我没看错,flink/Kafka09Fetcher 在它的 run 方法中有以下代码,它会破坏事件循环

          if (deserializer.isEndOfStream(value)) {
                              // end of stream signaled
                              running = false;
                              break;
                          }
      

      我的想法是使用 Till Rohrmann 的控制消息的想法结合这个 isEndOfStream 方法来告诉 KafkaConsumer 停止阅读。

      有什么不可行的原因吗?或者也许我忽略了一些极端情况?

      https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L146

      【讨论】:

        【解决方案4】:

        在您的测试中,您可以在单独的线程中启动作业执行,等待一段时间允许它进行数据处理,取消线程(它将中断作业)并进行断言。

        CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
            try {
                environment.execute(jobName);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        try {
            handle.get(seconds, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job
        }
        
        // Make assertions here
        

        【讨论】:

          猜你喜欢
          • 2015-12-11
          • 1970-01-01
          • 2017-12-28
          • 1970-01-01
          • 1970-01-01
          • 2017-02-05
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多