一、环境
Erlang的事务环境主要由用户当前进程,tm事务进程,locker锁进程维护。
而这里主要想弄明白Mnesia事务的实现思路和环境管理,衡量其并行能力和潜在风险。
二、事务实现
Erlang有两种事务,即普通事务transaction和完全同步事务sync_transaction;
区别就在于日志是否存储在副本上,不是安全系数要求极高的情况下采用普通事务即可。
1、事务开始
1)会找到事务进程,并从中开辟一张存储事务id的表,为本次事务分配一个事务ID;
2)获得事务进程分配的事务ID,会将次运行环境标记为事务环境
transaction(OldTidTs, Fun, Args, Retries, Mod, Type) ->
Factor = 1,
case OldTidTs of
undefined -> % Outer
execute_outer(Mod, Fun, Args, Factor, Retries, Type);
{_, _, non_transaction} -> % Transaction inside ?sync_dirty
Res = execute_outer(Mod, Fun, Args, Factor, Retries, Type),
put(mnesia_activity_state, OldTidTs),
Res;
{OldMod, Tid, Ts} -> % Nested
execute_inner(Mod, Tid, OldMod, Ts, Fun, Args, Factor, Retries, Type);
_ -> % Bad nesting
{aborted, nested_transaction}
end.
3)后面的数据库操作会验证是否处于事务环境,如读取的第二个模式
read(Tab, Key, LockKind) ->
case get(mnesia_activity_state) of
{?DEFAULT_ACCESS, Tid, Ts} ->
read(Tid, Ts, Tab, Key, LockKind);
{Mod, Tid, Ts} ->
Mod:read(Tid, Ts, Tab, Key, LockKind);
_ ->
abort(no_transaction)
end.
4)事务进程是本地节点的,如何锁住其他节点操作
其实上面的事务环境就已经够建完成,在锁的处理上是操作函数决定的,比如read和write的时候。
try_sticky_lock(Tid, Op, Pid, {Tab, _} = Oid) ->
case ?ets_lookup(mnesia_sticky_locks, Tab) of
[] ->
try_lock(Tid, Op, Pid, Oid);
[{_,N}] when N == node() ->
try_lock(Tid, Op, Pid, Oid);
[{_,N}] ->
Req = {Pid, {Op, Tid, Oid}},
Pid ! {?MODULE, node(), {switch, N, Req}},
true
end.
先找到表这个表的粘锁进程,然后进行锁业务处理,这里就可能遇到锁排队了。
{{queue, Lucky},_} ->
?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
%% Append to queue: Nice place for trace output
?ets_insert(mnesia_lock_queue,
#queue{oid = Oid, tid = Tid, op = Op,
pid = Pid, lucky = Lucky}),
?ets_insert(mnesia_tid_locks, {{Tid, Oid, {queued, Op}}})看到这里已经得到一个结论:
事务环境由本地进程加本地事务进程维护,事务排队等待和解锁是通过表指定的粘表进程处理;
每个节点只有一个事务进程和粘表进程,消息吞吐也就是这个进程的并发量决定。
init([]) ->
ProcLib = [mnesia_monitor, proc_lib],
Flags = {one_for_all, 0, timer:hours(24)}, % Trust the top supervisor
Workers = [worker_spec(mnesia_monitor, timer:seconds(3), [gen_server]),
worker_spec(mnesia_subscr, timer:seconds(3), [gen_server]),
worker_spec(mnesia_locker, timer:seconds(3), ProcLib),
worker_spec(mnesia_recover, timer:minutes(3), [gen_server]),
worker_spec(mnesia_tm, timer:seconds(30), ProcLib),
supervisor_spec(mnesia_checkpoint_sup),
supervisor_spec(mnesia_snmp_sup),
worker_spec(mnesia_controller, timer:seconds(3), [gen_server]),
worker_spec(mnesia_late_loader, timer:seconds(3), ProcLib)
],
{ok, {Flags, Workers}}.