【问题标题】:Phoenix Channels - Multiple channels per socketPhoenix Channels - 每个插槽有多个通道
【发布时间】:2017-03-17 19:24:54
【问题描述】:

我正在编写一个使用 Elixir Channels 处理实时事件的应用程序。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个通道。所以我的应用程序是一个聊天应用程序,其中用户是多个群聊的一部分。我有 1 个名为 MessageChannel 的 Phoenix Channel,其中 join 方法将处理动态主题。

def join("groups:" <> group_id, payload, socket) do
....

假设 John 加入了组/主题 A 和 B,而 Bob 只加入了组/主题 B。当 john 向组/主题 A 发送消息时,broadcast!/3 也会将该消息发送给 Bob 是否正确?因为handle_in 没有消息发送到哪个主题/组的上下文。

我将如何处理它,以使 Bob 不会收到发送到 A 组的事件。我的设计是否正确?

【问题讨论】:

    标签: sockets phoenix-framework elixir phoenix-channels


    【解决方案1】:

    免责声明:我没有查看通道的内部工作原理,这些信息完全来自我在应用程序中使用通道的第一次体验。

    当有人加入不同的组时(根据您的join/3 中的模式匹配),将通过单独的通道(套接字)建立连接。因此,向 A 广播不会向 B 的成员发送消息,只会向 A 发送消息。

    在我看来,Channel 模块类似于GenServer,而连接有点像start_link,其中启动了一个新服务器(进程)(但是,仅当它不存在时)。

    您真的可以忽略模块的内部运作,只需了解如果您加入一个名称与现有名称不同的频道,您将加入一个独特的频道。您也可以相信,如果您向频道广播,则只有该频道的成员才能收到消息。

    例如,在我的应用程序中,我有一个用户频道,我希望只连接一个用户。连接看起来像def join("agent:" &lt;&gt; _agent, payload, socket),其中代理只是一个电子邮件地址。当我向该频道广播消息时,只有单个代理接收消息。我还有一个所有代理都加入的办公室频道,当我希望所有代理都收到消息时,我会向它广播。

    希望这会有所帮助。

    【讨论】:

    • 但据我了解,一个套接字可以处理多个主题。当请求进入 handle_in 时,它不知道哪个主题正在接收和发送消息。广播发生在套接字级别。
    • 我打算在这个评论中回答,但它太大了。简而言之,broadcast/3 发生在套接字级别,但正如@Jason Harrelson 所说,套接字知道它当前的主题是什么。 (我很享受潜入频道内部的乐趣,但这不是必需的。)
    【解决方案2】:

    因为handle_in 没有消息发送到哪个主题/组的上下文。

    Phoenix.Channel.broadcast/3 被调用时,显然它确实有与消息相关联的主题(从签名中看不出来)。可以看到以on this line of channel.ex开头的代码:

    def broadcast(socket, event, message) do
        %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
        Server.broadcast pubsub_server, topic, event, message
    end
    

    因此,当使用套接字调用broadcast/3 时,它会匹配当前主题,然后调用底层Server.broadcast/4

    (如果你像我一样好奇,这反过来会调用底层的PubSub.broadcast/3,它会做一些分发魔法来将调用路由到你配置的 pubsub 实现服务器,很可能使用 pg2,但我离题了。 .)

    所以,我在阅读 Phoenix.Channel docs 时发现这种行为并不明显,但他们确实在 Incoming Events 的 phoenixframework 频道页面中明确说明了这一点:

    broadcast!/3 将通知所有加入的客户端关于此套接字的主题并调用他们的handle_out/3 回调。

    所以它只是在“这个套接字的主题”上广播。他们将同一页面上的主题定义为:

    topic - 字符串 topic 或 topic:subtopic 对命名空间,例如“messages”、“messages:123”

    因此,在您的示例中,“主题”实际上是主题:子主题对命名空间字符串:"groups:A""groups:B"。 John 必须在客户端分别订阅这两个主题,因此您实际上会引用两个不同的频道,即使它们使用相同的套接字。因此,假设您使用的是 javascript 客户端,频道创建看起来像这样:

    let channelA = this.socket.channel("groups:A", {});
    let channelB = this.socket.channel("groups:B", {});
    

    然后,当您从客户端在频道上发送消息时,您只使用具有主题的频道,该主题在服务器上匹配了我们上面看到的模式。

    channelA.push(msgName, msgBody);
    

    【讨论】:

      【解决方案3】:

      实际上,套接字路由是基于如何使用channel API 在项目套接字模块中定义主题来完成的。对于我的 Slack 克隆,我使用三个通道。我有一个系统级通道来处理状态更新、一个用户通道和一个房间通道。

      任何给定的用户都订阅了 0 或 1 个频道。但是,用户可以订阅多个频道。

      对于发送到特定房间的消息,我通过房间频道进行广播。

      当我检测到特定房间的未读消息、通知或徽章时,我会使用用户频道。每个用户频道也存储用户订阅的房间列表(它们列在客户端的侧栏上)。

      所有这一切的诀窍是使用几个通道 API,主要是 intercepthandle_outMy.Endpoint.subscribehandle_info(%Broadcast{},socket)

      • 我使用intercept 来捕获我想在发送出去之前忽略或操纵的广播消息。
      • 在用户频道中,我订阅了从房间频道广播的事件
      • 订阅时,您会收到一个带有 %Broadcast{} 结构的 handle_info 调用,其中包含广播消息的主题、事件和负载。

      这是我的几段代码:

      defmodule UcxChat.UserSocket do
        use Phoenix.Socket
        alias UcxChat.{User, Repo, MessageService, SideNavService}
        require UcxChat.ChatConstants, as: CC
      
        ## Channels
        channel CC.chan_room <> "*", UcxChat.RoomChannel    # "ucxchat:"
        channel CC.chan_user <> "*", UcxChat.UserChannel  # "user:"
        channel CC.chan_system <> "*", UcxChat.SystemChannel  # "system:"
        # ...
      end
      
      # user_channel.ex
       # ...
       intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
       #...
       def handle_out("room:join", msg, socket) do
          %{room: room} = msg
          UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
          update_rooms_list(socket)
          clear_unreads(room, socket)
          {:noreply, subscribe([room], socket)}
        end
        def handle_out("room:leave" = ev, msg, socket) do
          %{room: room} = msg
          debug ev, msg, "assigns: #{inspect socket.assigns}"
          socket.endpoint.unsubscribe(CC.chan_room <> room)
          update_rooms_list(socket)
          {:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
        end
      
        # ...
        defp subscribe(channels, socket) do
          # debug inspect(channels), ""
          Enum.reduce channels, socket, fn channel, acc ->
            subscribed = acc.assigns[:subscribed]
            if channel in subscribed do
              acc
            else
              socket.endpoint.subscribe(CC.chan_room <> channel)
              assign(acc, :subscribed, [channel | subscribed])
            end
          end
        end
        # ...
      end
      

      我还将 user_channel 用于与特定用户相关的所有事件,例如客户端状态、错误消息等。

      【讨论】:

      • 感谢您的帮助/反馈。这是我在 Stack Overflow 上的第一天(作为贡献者)。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-12-26
      • 2016-09-29
      • 1970-01-01
      • 1970-01-01
      • 2020-06-03
      • 2017-10-29
      • 1970-01-01
      相关资源
      最近更新 更多