【问题标题】:How to store a list of Akka actors in Play framework 2 application?如何在 Play 框架 2 应用程序中存储 Akka 演员列表?
【发布时间】:2014-09-23 08:04:23
【问题描述】:

我有一个 Play framework 2 应用程序,它可以接收数据并通过 WebSockets 将其发送到多个客户端。我使用 Akka actor 来处理 WebSockets,就像在 this documentation 中一样。我还有一个 WebSocketRouter 类,它扩展了 UntypedActor 并包含路由逻辑(决定将系统接收到的数据传递给哪些客户端)。我知道我可以使用 Akka 的 Router 功能,但这对我来说目前不是问题。问题是我必须存储所有活动客户的列表。现在我将它存储在WebSocketRouter 类的静态列表中。这是编写概念验证原型的最快方法,但它不是线程安全的,而且似乎不是“Akka 方式”。 下面是一个简化的代码示例:

WebSocketController:

//This controller handles the creation of WebSockets.
public class WebSocketController extends Controller {
    public static WebSocket<String> index() {
        return WebSocket.withActor(new F.Function<ActorRef, Props>() {
            public Props apply(ActorRef out) throws Throwable {
                return MessageSender.props(out);
            }
        });
    }
}

消息发送者:

//Hold a reference to the auto-created Actor that handles WebSockets 
//and also registers and unregisters itself in the router.
public class  MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;

    public MessageSender(ActorRef out) {
        this.out = out;
    }

    @Override
    public void preStart() {
        WebSocketRouter.addSender(getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        }
        else {
            unhandled(message);
        }
    }

    public void postStop() {
        WebSocketRouter.removeSender(getSelf());
    }
}

WebSocketRouter:

public class WebSocketRouter extends UntypedActor {
    private static ArrayList<ActorRef> senders;
    static {
        senders = new ArrayList<>();
    }

    public static void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    public static void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

再一次,我知道这是一个糟糕的解决方案,我正在寻找一个更好的解决方案。我曾想过创建一个线程安全的单例类来保存当前连接。我还考虑过将当前连接列表保存在某个 Akka Actor 的实例中,并通过 Akka 消息修改列表,但是为了使这种方式工作,我必须静态存储一个 ActorRef 到该 Actor,以便它可以可以从不同的ActorSystems 访问。

解决我的问题最适合 Akka 意识形态的最佳方法是什么?

【问题讨论】:

  • 嘿,当我尝试停止演员时,websocket 连接将关闭并且它不会再次打开,如果您有任何想法,请告诉我

标签: java playframework akka


【解决方案1】:

与其拥有对 Actor 的静态引用(在您的情况下为 WebSocketRouter),不如想出一些消息来发送它?这样,参与者可以以一致的方式保持自己的内部状态。通过消息改变状态是 Actor 模型的主要优势之一。

在我开始编写代码之前,如果这不是 100% 准确,我很抱歉,我只使用了 Akka 的 Scala 版本,并且基于对 Akka Documentation 的快速扫描。

所以在你的情况下,我会定义一些对象来表达加入/离开...

public class JoinMessage { } 
public class ExitMessage { } 

请注意,ExitMessage 仅在您打算保持 WebSocket 打开并让用户停止收听路由器时才需要。否则,路由器可以检测到 Actor 何时终止。

然后您将更改您的 MessageSender 演员,以便在他们加入或离开聊天室时发送这些消息....

public class MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;
    private final ActorRef router;

    public MessageSender(ActorRef out) {
        this.out = out;
        this.router= getContext().actorSelection("/Path/To/WebSocketRouter");
    }

    @Override
    public void preStart() {
        router.tell(new JoinMessage(), getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        } else {
            unhandled(message);
        }
    }    
}

然后您的路由器可以更改为在内部管理状态,而不是在 Actor 上公开内部方法(您知道这不好)......

public class WebSocketRouter extends UntypedActor {
    private final Set<ActorRef> senders = new HashSet<>();

    private void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    private void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof JoinMessage) {
            addSender(sender);
            getContext().watch(sender); // Watch sender so we can detect when they die.
        } else if (message instanceof Terminated) {
            // One of our watched senders has died.
            removeSender(sender);
        } else if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

同样,这段代码让您了解如何利用 Actor 模型来完成这项任务。抱歉,如果 Java 不是 100% 准确,但希望您能按照我的意图。

【讨论】:

  • 感谢您的回答。这几乎是我自己做的。对我来说,一个大问题是我必须只使用WebSocketRouter 的一个实例,并且我必须能够从不同的ActorSystems 访问这个对象。我在默认的 Akka ActorSystem 中创建了它,并通过其 path 在整个代码中访问它。
  • 我有一个愚蠢的问题,我刚开始使用 websockets 和演员。在最基本的例子中,控制器中有一个方法返回一个 websocket,使用WebSocket.tryAcceptWithActorProps。我如何向演员发送消息(任意 - 不是响应)!?我有对 websocket 的引用,但没有看到任何要编写、发送或获取演员的内容...
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-28
  • 1970-01-01
  • 2012-07-12
  • 1970-01-01
相关资源
最近更新 更多