【发布时间】:2020-01-08 10:31:57
【问题描述】:
我在 Play Controller 中使用 Akka,并对名为 publish 的参与者执行 ask(),内部发布参与者对多个参与者执行请求并传递发送者的引用。控制器参与者需要等待来自多个参与者的响应并创建响应列表。
请在下面找到代码。但此代码仅等待 1 个响应和后者终止。请推荐
// Performs ask to publish actor
Source<Object,NotUsed> inAsk = Source.fromFuture(ask(publishActor,service.getOfferVerifyRequest(request).getPayloadData(),1000));
final Sink<String, CompletionStage<String>> sink = Sink.head();
final Flow<Object, String, NotUsed> f3 = Flow.of(Object.class).map(elem -> {
log.info("Data in Graph is " +elem.toString());
return elem.toString();
});
RunnableGraph<CompletionStage<String>> result = RunnableGraph.fromGraph(
GraphDSL.create(
sink , (builder , out) ->{
final Outlet<Object> source = builder.add(inAsk).out();
builder
.from(source)
.via(builder.add(f3))
.to(out); // to() expects a SinkShape
return ClosedShape.getInstance();
}
));
ActorMaterializer mat = ActorMaterializer.create(aSystem);
CompletionStage<String> fin = result.run(mat);
fin.toCompletableFuture().thenApply(a->{
log.info("Data is "+a);
return true;
});
log.info("COMPLETED CONTROLLER ");
【问题讨论】:
标签: java-8 playframework akka akka-stream akka-http