实际上,套接字路由是基于如何使用channel API 在项目套接字模块中定义主题来完成的。对于我的 Slack 克隆,我使用三个通道。我有一个系统级通道来处理状态更新、一个用户通道和一个房间通道。
任何给定的用户都订阅了 0 或 1 个频道。但是,用户可以订阅多个频道。
对于发送到特定房间的消息,我通过房间频道进行广播。
当我检测到特定房间的未读消息、通知或徽章时,我会使用用户频道。每个用户频道也存储用户订阅的房间列表(它们列在客户端的侧栏上)。
所有这一切的诀窍是使用几个通道 API,主要是 intercept、handle_out、My.Endpoint.subscribe 和 handle_info(%Broadcast{},socket)。
- 我使用
intercept 来捕获我想在发送出去之前忽略或操纵的广播消息。
- 在用户频道中,我订阅了从房间频道广播的事件
- 订阅时,您会收到一个带有
%Broadcast{} 结构的 handle_info 调用,其中包含广播消息的主题、事件和负载。
这是我的几段代码:
defmodule UcxChat.UserSocket do
use Phoenix.Socket
alias UcxChat.{User, Repo, MessageService, SideNavService}
require UcxChat.ChatConstants, as: CC
## Channels
channel CC.chan_room <> "*", UcxChat.RoomChannel # "ucxchat:"
channel CC.chan_user <> "*", UcxChat.UserChannel # "user:"
channel CC.chan_system <> "*", UcxChat.SystemChannel # "system:"
# ...
end
# user_channel.ex
# ...
intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
#...
def handle_out("room:join", msg, socket) do
%{room: room} = msg
UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
update_rooms_list(socket)
clear_unreads(room, socket)
{:noreply, subscribe([room], socket)}
end
def handle_out("room:leave" = ev, msg, socket) do
%{room: room} = msg
debug ev, msg, "assigns: #{inspect socket.assigns}"
socket.endpoint.unsubscribe(CC.chan_room <> room)
update_rooms_list(socket)
{:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
end
# ...
defp subscribe(channels, socket) do
# debug inspect(channels), ""
Enum.reduce channels, socket, fn channel, acc ->
subscribed = acc.assigns[:subscribed]
if channel in subscribed do
acc
else
socket.endpoint.subscribe(CC.chan_room <> channel)
assign(acc, :subscribed, [channel | subscribed])
end
end
end
# ...
end
我还将 user_channel 用于与特定用户相关的所有事件,例如客户端状态、错误消息等。