【问题标题】:How to send a message in a reactive stream from a subscriber to a publisher in a web socket connection如何在 Web 套接字连接中将反应流中的消息从订阅者发送到发布者
【发布时间】:2016-05-20 01:30:48
【问题描述】:

我的应用程序有一个 Akka-Websocket 接口。 Web 套接字由一个参与者订阅者和一个参与者发布者组成。订阅者通过将命令发送给相应的参与者来处理命令。发布者监听事件流并将更新信息发布回流(最终发布到客户端)。这很好用。

我的问题:订阅者如何将事件发送回流?例如确认接收到的命令的执行。

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

一个不错的解决方案可能是订阅参与者(上面代码中的 WebSocketCommandSubscriber 参与者)可以将消息发送回流,例如 sender().tell(...)...

【问题讨论】:

    标签: akka akka-stream akka-http


    【解决方案1】:

    不,这是不可能的,至少不是直接的。流始终是单向的——所有消息都沿一个方向流动,而对它们的需求则沿相反方向流动。您需要将确认消息从接收器传递到源,以便后者将其发送回客户端,例如,通过在接收器 Actor 中注册源 Actor。它可能看起来像这样:

    Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
        sinkActor.tell(new RegisterSource(sourceActor), null);
    })
    

    然后,当您的 sink actor 收到 RegisterSource 消息后,它可以将确认消息发送到提供的 ActorRef,然后将它们转发到输出流。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-12-14
      • 1970-01-01
      • 2015-06-30
      • 2011-12-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多