【问题标题】:RunnableGraph to wait for multiple response from sourceRunnableGraph 等待来自源的多个响应
【发布时间】:2020-01-08 10:31:57
【问题描述】:

我在 Play Controller 中使用 Akka,并对名为 publish 的参与者执行 ask(),内部发布参与者对多个参与者执行请求并传递发送者的引用。控制器参与者需要等待来自多个参与者的响应并创建响应列表。

请在下面找到代码。但此代码仅等待 1 个响应和后者终止。请推荐


// Performs ask to publish actor
Source<Object,NotUsed> inAsk = Source.fromFuture(ask(publishActor,service.getOfferVerifyRequest(request).getPayloadData(),1000));

final Sink<String, CompletionStage<String>> sink = Sink.head();

        final Flow<Object, String, NotUsed> f3 = Flow.of(Object.class).map(elem -> {
            log.info("Data in Graph is " +elem.toString());
            return elem.toString();
        });
        RunnableGraph<CompletionStage<String>> result = RunnableGraph.fromGraph(
                GraphDSL.create(
                        sink , (builder , out) ->{
                            final Outlet<Object> source = builder.add(inAsk).out();
                            builder
                                    .from(source)
                                    .via(builder.add(f3))
                                    .to(out); // to() expects a SinkShape
                            return ClosedShape.getInstance();
                        }
                ));

        ActorMaterializer mat = ActorMaterializer.create(aSystem);

        CompletionStage<String> fin = result.run(mat);


        fin.toCompletableFuture().thenApply(a->{
                log.info("Data is "+a);
                return true;
        });

        log.info("COMPLETED CONTROLLER ");

【问题讨论】:

    标签: java-8 playframework akka akka-stream akka-http


    【解决方案1】:

    如果您有多个响应 ask 不会削减它,那仅适用于单个请求-响应,其中响应以 Future/CompletionStage 结尾。

    有几种不同的策略可以等待所有答案:

    一个是创建一个中间参与者,其唯一的工作是收集所有答案,然后在所有部分响应到达时响应原始请求者,这样您就可以使用 ask 来获取单个聚合响应。

    另一种选择是使用Source.actorRef 来获得一个ActorRef,您可以将其用作sendertell(并跳过使用ask)。然后在流中获取元素,直到满足某些条件(时间已经过去或元素已被看到)。您可能必须添加一个操作符来模拟请求响应超时,以确保如果参与者从不响应,则流会失败。

    共享代码还有一些其他问题,一个是在每个请求上创建一个物化器,它们有一个生命周期并且会随着时间的推移填满你的堆,你应该从游戏中注入一个物化器。

    有了给定的逻辑,就不需要使用 GraphDSL,只有具有多个输入和输出或循环的复杂流才需要。您应该能够单独使用 Flow API 来编写运算符(参见例如 https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#defining-and-running-streams

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-02-25
      • 2016-04-30
      • 2021-08-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-02-14
      相关资源
      最近更新 更多