【问题标题】:Use Actors to send data to Akka websockets使用 Actors 向 Akka websockets 发送数据
【发布时间】:2017-10-06 11:03:31
【问题描述】:

我正在使用Akka websockets 将数据推送到某个客户端。

这是我到目前为止所做的:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.WebSocket;
import akka.japi.Function;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class Server {

  public static HttpResponse handleRequest(HttpRequest request) {
    System.out.println("Handling request to " + request.getUri());
    if (request.getUri().path().equals("/greeter")) {
      final Flow<Message, Message, NotUsed> greeterFlow = greeterHello();
      return WebSocket.handleWebSocketRequestWith(request, greeterFlow);
    } else {
      return HttpResponse.create().withStatus(404);
    }
  }

  public static void main(String[] args) throws Exception {
    ActorSystem system = ActorSystem.create();

    try {
      final Materializer materializer = ActorMaterializer.create(system);

      final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request);
      CompletionStage<ServerBinding> serverBindingFuture = Http.get(system).bindAndHandleSync(handler,
          ConnectHttp.toHost("localhost", 8080), materializer);

      // will throw if binding fails
      serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
      System.out.println("Press ENTER to stop.");
      new BufferedReader(new InputStreamReader(System.in)).readLine();
    } finally {
      system.terminate();
    }
  }

  public static Flow<Message, Message, NotUsed> greeterHello() {
    return Flow.fromSinkAndSource(Sink.ignore(),
        Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!")));
  }
}

在客户端,我成功收到“你好!”信息。 但是,现在我想动态发送数据(最好是来自 Actor),如下所示:

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class PushActor extends UntypedActor {
  @Override
  public void onReceive(Object message) {
    if (message instanceof String) {
      String statusChangeMessage = (String) message;
      // How to push this message to a socket ??
    } else {
      System.out.println(String.format("'%s':\nReceived unknown message '%s'!", selfActorPath, message));
    }
  }

}

我无法在网上找到任何关于此的示例。

以下是正在使用的软件栈:

【问题讨论】:

    标签: java http websocket akka akka-http


    【解决方案1】:

    一种 - 不一定非常优雅 - 这样做的方法是使用 Source.actorRef 并将具体化的 Actor 发送到某个地方(可能是路由器 Actor?),具体取决于您的要求。

    public static Flow<Message, Message, NotUsed> greeterHello() {
        return Flow.fromSinkAndSourceMat(Sink.ignore(),
            Source.actorRef(100, OverflowStrategy.fail()), 
            Keep.right()).mapMaterializedValue( /* send your actorRef to a router? */);
    }
    

    接收连接客户端的actorRefs的人必须负责将消息路由给它们。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-23
      • 2015-11-09
      • 2011-08-07
      • 2012-04-24
      相关资源
      最近更新 更多