这一篇,主要分析下,client 是怎么 connect server的,以及成功connect server 之后,会做哪些事情,session是怎么 start的。

由protocol 开始

之前分析过message 流向,接受到TCP 数据之后,经过parser 的解析,会交由protocol 进行packet 级别的数据处理,并根据不同的packet type 进行不同的后续处理。

connect 类型的packet 的处理是这样开始的:

 1 process(Packet = ?CONNECT_PACKET(Var), State0) ->
 2     %% 参数提取
 3     ...
 4     %% check proto 以及clientid
 5     case validate_connect(Var, State1) of
 6         ?CONNACK_ACCEPT ->
 7             %% auth
 8             ...
 9             case mqttd_sm:start_session(CleanSess, clientid(State2)) of
10                 ok ->
11                     %% 把client 相关信息,写到一张ets table 里
12                     ...
13                     %% 开始keepalive,后续再分析
14                     start_keepalive(KeepAlive)
15                 _ ->
16                     error
17             end;
18         _ ->
19             error
20     end,
21     %% connack

1、提取必要的参数,并进行参数验证

参数验证会检查proto 的version 以及name,并且进行必要的auth check,都通过验证之后,才会继续处理。否则,只好error了。

2、创建session

这个是connect 流程处理的重头戏。

3、记录额外信息,并开始keepalive

记录额外信息,能够进行额外的stat look,dashboard 会用到的。keepalive 就是一个心跳检查。

emqtt 2 (我要连服务器)

P1

创建session

0、准备一下

从上一个小节,可以看到,其实connect 流程的处理,主要就是『创建session』。

client 的session 是由session manager 进程创建的,session manager 进程是一个pool,pool size 和scheduler 的数量相关。而这些session managers 是在application start 的时候start的,挂在emqttd_sm_sup supervisor 进程下面(该进程挂在主sup 进程下面)。

而,session 的存储,主要用到的是 『session』mnesia table,其table 字段如下:

-record(mqtt_session,
        {client_id   :: binary(),
         sess_pid    :: pid(),
         persistent  :: boolean()
        }).

表为set 类型的表,client_id的key,sess_pid 是client_id 对应的session process id .

1、start_seesion

%% @doc Start a session
-spec(start_session(boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}).
start_session(CleanSess, ClientId) ->
    SM = gproc_pool:pick_worker(?POOL, ClientId),
    call(SM, {start_session, {CleanSess, ClientId, self()}}).

start_session/2 是一个接口函数,其实现部分,是在某一个session manager process 的handle_call callback 实现的。start_session/2 的第一个参数表示是否要清理掉旧的session,第二个参数就是clientid,从pool 里面选取一个session manager process,就是根据clientid(类似于hash)。

1.1 callback 的整体逻辑

 1 handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->
 2     %% 查 session
 3     case lookup_session(ClientId) of
 4         undefined ->
 5             %% Create session locally
 6             %% 增 session
 7             create_session(Client, State);
 8         Session ->
 9             %% 恢复 session
10             case resume_session(Session, ClientPid) of
11                 {ok, SessPid} ->
12                     {reply, {ok, SessPid, true}, State};
13                 {error, Erorr} ->
14                     {reply, {error, Erorr}, State}
15              end
16     end;
17 %% Transient Session
18 handle_call({start_session, Client = {true, ClientId, _ClientPid}}, _From, State) ->
19     case lookup_session(ClientId) of
20         undefined ->
21             create_session(Client, State);
22         Session ->
23             %% 删 session
24             case destroy_session(Session) of
25                 ok ->
26                     create_session(Client, State);
27                 {error, Error} ->
28                     {reply, {error, Error}, State}
29             end
30     end;

在handle_call的callback 中,基本的逻辑是这样的

1、首先根据clientid 查找是否存在session

  2、如果不存在,那么就create 一个全新的session

  3、如果存在,则

    4、要么恢复session

    5、要么先销毁session 之后,再重新create 一个全新的session

emqtt 2 (我要连服务器)

P2

1.2 查 session

查找session,是根据『session mnesia table』的key(clientid)进行 dirty_read

1.3 create session

create session 的思路是:

  1. spawn 一个新的进程,并且挂在emqttd_session_sup supervisor进程下面,返回sessionpid
  2. 利用transaction 将{clientid, sessionpid, true|false} 写入到『session mnesia table』
  3. session manager process monitor 一下 sessionpid(崩了之后,清理session 信息)

1.4 恢复session

恢复session 会先根据旧的session记录,判断 old session pid 是否存活,如果存活,则进行恢复操作,否则,error。

1.5 销毁session

首先shutdown sessionpid,然后清理session table即可。

总结

好像也不需要?(resume session 的处理流程 后补)

相关文章: