分布式系统编程很难。很难理解。很难正确实现。
riak_core 的源代码一开始可能很难理解。以下是一些帮助我更好地理解 riak_core 的资源:
关于“很难正确实现”,您可以阅读Jepsen posts by aphyr,了解主要数据库和分布式存储系统在自己的实现中存在或以前存在问题的示例和案例。
也就是说,这是一个在 Erlang 中非常简单的环实现,但是它仍然存在许多漏洞,下面将解决:
-module(node_ring).
-behaviour(gen_server).
% Public API
-export([start_link/0]).
-export([erase/1]).
-export([find/1]).
-export([store/2]).
% Ring API
-export([join/1]).
-export([nodes/0]).
-export([read/1]).
-export([write/1]).
-export([write/2]).
% gen_server
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
-record(state, {
node = node() :: node(),
ring = ordsets:new() :: ordsets:ordset(node()),
data = dict:new() :: dict:dict(term(), term())
}).
% Public API
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
erase(Key) ->
write({erase, Key}).
find(Key) ->
read({find, Key}).
store(Key, Value) ->
write({store, Key, Value}).
% Ring API
join(Node) ->
gen_server:call(?MODULE, {join, Node}).
nodes() ->
gen_server:call(?MODULE, nodes).
read(Request) ->
gen_server:call(?MODULE, {read, Request}).
write(Request) ->
gen_server:call(?MODULE, {write, Request}).
write(Node, Request) ->
gen_server:call(?MODULE, {write, Node, Request}).
% gen_server
init([]) ->
State = #state{},
{ok, State}.
handle_call({join, Node}, _From, State=#state{node=Node}) ->
{reply, ok, State};
handle_call({join, Peer}, From, State=#state{node=Node, ring=Ring}) ->
case net_adm:ping(Peer) of
pong ->
case ordsets:is_element(Peer, Ring) of
true ->
{reply, ok, State};
false ->
monitor_node(Peer, true),
NewRing = ordsets:add_element(Peer, Ring),
spawn(fun() ->
rpc:multicall(Ring, ?MODULE, join, [Peer])
end),
spawn(fun() ->
Reply = rpc:call(Peer, ?MODULE, join, [Node]),
gen_server:reply(From, Reply)
end),
{noreply, State#state{ring=NewRing}}
end;
pang ->
{reply, {error, connection_failed}, State}
end;
handle_call(nodes, _From, State=#state{node=Node, ring=Ring}) ->
{reply, ordsets:add_element(Node, Ring), State};
handle_call({read, Request}, From, State) ->
handle_read(Request, From, State);
handle_call({write, Request}, From, State=#state{node=Node, ring=Ring}) ->
spawn(fun() ->
rpc:multicall(Ring, ?MODULE, write, [Node, Request])
end),
handle_write(Request, From, State);
handle_call({write, Node, _Request}, _From, State=#state{node=Node}) ->
{reply, ok, State};
handle_call({write, _Peer, Request}, From, State) ->
handle_write(Request, From, State);
handle_call(_Request, _From, State) ->
{reply, ignore, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({nodedown, Peer}, State=#state{ring=Ring}) ->
NewRing = ordsets:del_element(Peer, Ring),
{noreply, State#state{ring=NewRing}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% @private
handle_read({find, Key}, _From, State=#state{data=Data}) ->
{reply, dict:find(Key, Data), State}.
%% @private
handle_write({erase, Key}, _From, State=#state{data=Data}) ->
{reply, ok, State#state{data=dict:erase(Key, Data)}};
handle_write({store, Key, Value}, _From, State=#state{data=Data}) ->
{reply, ok, State#state{data=dict:store(Key, Value, Data)}}.
如果我们启动 3 个不同的节点,并将 -sname 设置为 node0、node1 和 node2:
erl -sname node0 -setcookie cook -run node_ring start_link
erl -sname node1 -setcookie cook -run node_ring start_link
erl -sname node2 -setcookie cook -run node_ring start_link
以下是我们将节点加入环的方法:
(node0@localhost)1> node_ring:nodes().
['node0@localhost']
(node0@localhost)2> node_ring:join('node1@localhost').
ok
(node0@localhost)3> node_ring:nodes().
['node0@localhost', 'node1@localhost']
如果我们在 node1 上运行 node_ring:nodes(),我们会得到:
(node1@localhost)1> node_ring:nodes().
['node0@localhost', 'node1@localhost']
现在让我们转到node2 并加入另外两个节点之一:
(node2@localhost)1> node_ring:nodes().
['node2@localhost']
(node2@localhost)2> node_ring:join('node0localhost').
ok
(node2@localhost)3> node_ring:nodes().
['node0@localhost', 'node1@localhost',
'node2@localhost']
请注意node0 和node1 是如何添加到node2 的,尽管我们只在连接中指定了node0。这意味着如果我们有数百个节点,我们只需要加入其中一个即可加入整个环。
现在我们可以在任何一个节点上使用store(Key, Value),它将被复制到另外两个:
(node0@localhost)4> node_ring:store(mykey, myvalue).
ok
让我们尝试从其他两个读取mykey,首先是node1:
(node1@localhost)2> node_ring:find(mykey).
{ok,myvalue}
然后node2:
(node2@localhost)4> node_ring:find(mykey).
{ok,myvalue}
让我们在node2 上使用erase(Key) 并尝试在其他节点上再次读取密钥:
(node2@localhost)5> node_ring:erase(mykey).
ok
在node0:
(node0@localhost)5> node_ring:find(mykey).
error
开启node1:
(node1@localhost)3> node_ring:find(mykey).
error
太棒了!我们有一个分布式去中心化环,可以作为一个简单的键/值存储!这很容易,一点也不难!只要我们没有任何节点宕机、丢包、网络分区、节点添加到环,或其他形式的混乱,我们在这里有一个近乎完美的解决方案。然而,实际上,您必须考虑所有这些因素,才能拥有一个从长远来看不会让您发疯的系统。
以下是我们的小node_ring 无法处理的简单示例:
-
node1 挂了
-
node0 存储键 a 和值 1
-
node1 回来加入戒指
-
node1 试图找到密钥 a
首先,让我们杀死node1。如果我们检查node0上的节点:
(node0@localhost)6> node_ring:nodes().
['node0@localhost','node2@localhost']
在node2:
(node2@localhost)6> node_ring:nodes().
['node0@localhost','node2@localhost']
我们看到node1 已自动从环中删除。让我们在node0 上存储一些东西:
(node0@localhost)7> node_ring:store(a, 1).
ok
从node2阅读:
(node2@localhost)7> node_ring:find(a).
{ok,1}
让我们再次启动node1,加入这个圈子:
(node1@localhost)1> node_ring:join('node0@localhost').
ok
(node1@localhost)2> node_ring:nodes().
['node0@localhost','node1@localhost',
'node2@localhost']
(node1@localhost)3> node_ring:find(a).
error
哎呀,我们在环上的数据不一致。有必要进一步研究其他分布式系统和 CAP theorem,然后才能决定我们希望我们的小 node_ring 在这些不同情况下如何表现(比如我们希望它表现得像 AP 还是 CP 系统)。