我对问题的解决方案。
我自己实现了使用gen_server:call 的多重调用
基本思想是在单独的进程中使用 gen_server:call() 调用所有节点。并收集这些调用的结果。通过从调用进程的邮箱接收消息进行收集。
为了控制超时,我计算超时到期时的截止日期,然后将其用作参考点来计算 receive 中 after 的超时。
实施
主要功能是:
multicall(Nodes, Name, Req, Timeout) ->
Refs = lists:map(fun(Node) -> call_node(Node, Name, Req, Timeout) end, Nodes),
Results = read_all(Timeout, Refs),
PosResults = [ { Node, Result } || { ok, { ok, { Node, Result } } } <- Results ],
{ PosResults, calc_bad_nodes(Nodes, PosResults) }.
这里的想法是调用所有节点并在一个 Timeout 内等待所有结果。
调用一个节点是从派生的进程中执行的。它会捕获 gen_server:call 使用的出口以防出错。
call_node(Node, Name, Req, Timeout) ->
Ref = make_ref(),
Self = self(),
spawn_link(fun() ->
try
Result = gen_server:call({Name,Node},Req,Timeout),
Self ! { Ref, { ok, { Node, Result } } }
catch
exit:Exit ->
Self ! { Ref, { error, { 'EXIT', Exit } } }
end
end),
Ref.
错误节点被计算为那些在超时时间内没有响应的节点
calc_bad_nodes(Nodes, PosResults) ->
{ GoodNodes, _ } = lists:unzip(PosResults),
[ BadNode || BadNode <- Nodes, not lists:member(BadNode, GoodNodes) ].
通过超时读取邮箱来收集结果
read_all(ReadList, Timeout) ->
Now = erlang:monotonic_time(millisecond),
Deadline = Now + Timeout,
read_all_impl(ReadList, Deadline, []).
在截止日期不发生之前执行读取
read_all_impl([], _, Results) ->
lists:reverse(Results);
read_all_impl([ W | Rest ], expired, Results) ->
R = read(0, W),
read_all_impl(Rest, expired, [R | Results ]);
read_all_impl([ W | Rest ] = L, Deadline, Results) ->
Now = erlang:monotonic_time(millisecond),
case Deadline - Now of
Timeout when Timeout > 0 ->
R = read(Timeout, W),
case R of
{ ok, _ } ->
read_all_impl(Rest, Deadline, [ R | Results ]);
{ error, { read_timeout, _ } } ->
read_all_impl(Rest, expired, [ R | Results ])
end;
Timeout when Timeout =< 0 ->
read_all_impl(L, expired, Results)
end.
一次读取只是从带有超时的邮箱接收。
read(Timeout, Ref) ->
receive
{ Ref, Result } ->
{ ok, Result }
after Timeout ->
{ error, { read_timeout, Timeout } }
end.
进一步改进:
- rpc 模块生成单独的进程以避免迟到答案的垃圾。所以在这个多重调用函数中做同样的事情会很有用
-
infinity 超时可以用明显的方式处理