【问题标题】:Stream completes after adding flow添加流后流完成
【发布时间】:2019-03-20 20:48:00
【问题描述】:

我有一个简单的流声明如下:

Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(30), Files.list(rootDir).collect(Collectors.toList()))
                .mapConcat(files -> files)
                .log("scanning logs")
                .via(logsFlow.create())
                .via(kafkaFlow.create())
//                .via(archiveFlow.create())
                .runWith(Sink.ignore(), materializer)
                .whenComplete((a, b) -> {
                    log.info("done");
                });

使用 archiveFlow 注释掉一切都按预期工作。但是当我添加额外的流程时,无论是存档流程还是像这样的一些简单流程:

.via(Flow.of(Path.class).map(path -> {
                    log.info("foo");
                    return path;
                }))

流在第一次滴答后完成。 这是为什么?

2019-03-20 21:35:09.292 DEBUG 50089 --- [lt-dispatcher-2] a.kafka.internal.DefaultProducerStage    : Stage completed
2019-03-20 21:35:09.294 DEBUG 50089 --- [lt-dispatcher-4] akka.stream.Materializer                 : [scanning logs] Downstream finished.
2019-03-20 21:35:09.296  INFO 50089 --- [onPool-worker-3] com.example.MyStream  : done

【问题讨论】:

    标签: akka akka-stream reactive-streams


    【解决方案1】:

    原来是Akka吞噬了一个错误。我使用了监督策略,现在一切正常。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-10-12
    • 2012-02-20
    • 1970-01-01
    • 1970-01-01
    • 2020-06-28
    • 2019-07-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多