【问题标题】:Akka Streams onFailuresWithBackoff not restarting flowAkka Streams onFailuresWithBackoff 未重新启动流程
【发布时间】:2019-06-12 05:39:02
【问题描述】:

我正在尝试在 Akka Streams javadsl 中使用RestartFlow 来重新启动我的流程阶段,如果在该阶段发生任何故障,但它似乎并没有重新启动流程,而是只是丢弃了消息。

我已经看过这个:RestartFlow in Akka Streams not working as expected,但我使用的是 2.5.19 版本,所以应该修复它吗?

RestartFlow.onFailuresWithBackoffRestartFlow.withBackoff 我都试过了,但都没有奏效。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截了异常,因此它不会从流程中抛出,而且似乎没有提供我想要的退避和最大重试策略。

流:

public Consumer.DrainingControl<Done> stream() {
    return Consumer.committableSource(consumerSettings,
        Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
            ConfigKeys.CONSUMER_TOPIC)))
        .via(RestartFlow.onFailuresWithBackoff(
                Duration.ofSeconds(1), // min backoff
                Duration.ofSeconds(2), // max backoff,
                0.2, // adds 20% "noise" to vary the intervals slightly
                10, // limits the amount of restarts to 10
                this::dispatchMessageFlow))
        .via(Committer.flow(CommitterSettings.create(system)))
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(mat);
}

然后是流程:

private Flow<ConsumerMessage.CommittableMessage<String, String>,
    ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
    return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
            .mapAsyncUnordered(
                config.getInt(ConfigKeys.PARALLELISM),
                msg ->
                    streamProcessor.process(msg.record().value())
                        .whenComplete((done, e) -> {
                            if (e != null) {
                                throw new RuntimeException(e);
                            } else {
                                if (done.status().isSuccess()){
                                    streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
                                        done.toString());
                                } else {
                                    throw new RuntimeException("HTTP Error!");
                                }
                            }
                        })
                        .thenApply(done -> msg.committableOffset()));
}

我看到过一次异常,akka 表示它将由于失败而重新启动图形,但在那之后就没有别的了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。

java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
    at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    ... 6 more

如果有人能帮我指出正确的方向,我将不胜感激。

【问题讨论】:

    标签: java akka akka-stream akka-http


    【解决方案1】:

    它的工作方式有点不同。长话短说 - 如果发生错误,消息将被丢弃,但源/流将重新启动,而不会杀死整个流。它在RestartFlow.onFailuresWithBackoff documentation 中有描述:

    重新启动过程本质上是有损的,因为取消和发送消息之间没有协调。来自包装流任一端的终止信号将导致另一端终止,并且任何传输中的消息都将丢失。在退避期间,此 Flow 将背压。

    【讨论】:

    • 明白了,谢谢!我有点想到这一点,而是实现了我自己的 RunnableGraph 将失败发送回源重试。
    猜你喜欢
    • 2018-01-14
    • 2019-05-14
    • 1970-01-01
    • 1970-01-01
    • 2021-09-08
    • 2015-07-31
    • 2016-10-20
    • 2020-02-20
    • 1970-01-01
    相关资源
    最近更新 更多