【问题标题】:Ordering data from multiple processes in Erlang在 Erlang 中从多个进程中排序数据
【发布时间】:2014-04-20 19:13:53
【问题描述】:

我正在制作一个程序,将 ID 号分类为房间和组。我正在尝试从多个并行进程中订购一些输出,但输出是这样的

{'C003','Group A',1}
{'C002','Group B',3}
{'C015','Group C',5}
{'C016','Group D',7}
{'C003','Group A',2}
{'C002','Group B',4}
{'C015','Group C',6}
{'C016','Group D',8}

但我想要这样:

{'C003','Group A',1}
{'C003','Group A',2}
{'C002','Group B',3}
{'C002','Group B',4}
{'C015','Group C',5}
{'C015','Group C',6}
{'C016','Group D',7}   
{'C016','Group D',8}

我想也许我可以将这些数字发送到一个进程并让它以某种方式按顺序打印它们和它们的组,但我不确定如何在进程并行发生的同时执行此操作。我认为一个解决方案可能是选择性接收,但我似乎也无法弄清楚。任何人都可以帮助我吗? 这是我的主程序中的一个 sn-p 代码 注意:我是 Erlang 的新手

-module(ppp).

-compile([export_all]).

categorise(L) ->

     Size = len(L) div 4,
     Rem = len(L) rem 4,    
     spawn(ppp, cat, [self(), 'Group A', L, 0, Size + Rem]),
     spawn(ppp, cat, [self(), 'Group B', L, (Size + Rem), Size]),
     spawn(ppp, cat, [self(), 'Group C', L, (2*Size + Rem), Size]),
     spawn(ppp, cat, [self(), 'Group D', L, (3*Size  + Rem), Size]), 
     wait(4). 

wait(0) -> {done};
wait(N) ->
receive
    done -> wait(N-1)
end.

cat(P, Name, L, Start, Elements) ->     
     Extract = lists:split(Start, L),   
     Group = element(2, Extract),   
     AGroup = lists:sublist(Group, Elements),
     spawn(ppp, putInRoom, [P, Name, AGroup]).

putInRoom(P, _, []) -> P ! done;

putInRoom(P, GroupName, [H|T]) -> 

if GroupName == 'Group A' ->        
    io:format("~w~n", [{'C003', GroupName, H}]),        
    putInRoom(P, GroupName, T);
    GroupName == 'Group B' -> 
    io:format("~w~n", [{'C002', GroupName, H}]),
    putInRoom(P, GroupName, T);
    GroupName == 'Group C' ->
    io:format("~w~n", [{'C015', GroupName, H}]),
    putInRoom(P, GroupName, T);
    GroupName == 'Group D' ->
    io:format("~w~n", [{'C016', GroupName, H}]),
    putInRoom(P, GroupName, T)
end.

len(L) -> 
    count(L, 0).

count([], Acc) -> Acc;

count([_|T], Acc) -> count(T, Acc + 1).

【问题讨论】:

  • 这与并行无关,与并发有关。问题是同步,因为进程之间没有同步。您需要以某种方式同步输出。当你负载很高时,使用睡眠将不起作用。
  • 所以Kadaj给出的答案是正确的,因为他建议同步产生这些进程

标签: concurrency parallel-processing erlang multicore


【解决方案1】:

不是为每个组生成putInRoom 进程,而是为组中的每个元素生成。

-module(ppp).

-compile([export_all]).

categorise(L) ->

     Size = len(L) div 4,
     Rem = len(L) rem 4,    
     spawn(ppp, cat, [self(), 'Group A', L, 0, Size + Rem]),
     spawn(ppp, cat, [self(), 'Group B', L, (Size + Rem), Size]),
     spawn(ppp, cat, [self(), 'Group C', L, (2*Size + Rem), Size]),
     spawn(ppp, cat, [self(), 'Group D', L, (3*Size  + Rem), Size]), 
     wait(4). 

wait(0) -> {done};
wait(N) ->
receive
    done -> wait(N-1)
after 2000 ->
     ok
end.

cat(P, Name, L, Start, Elements) ->
     % io:format("cat: ~p L: ~p~n", [P,L]),     
     Extract = lists:split(Start, L),   
     Group = element(2, Extract),   
     AGroup = lists:sublist(Group, Elements),
     % io:format("AGroup: ~p~n", [AGroup]),
     lists:map(fun(G) ->
                 spawn(ppp, putInRoom, [P, Name, G])
               end, AGroup).

putInRoom(P, GroupName, H) -> 
    % io:format("putInRoom: ~p, List: ~p~n", [GroupName, H]),
    if GroupName == 'Group A' ->        
        io:format("~w~n", [{'C003', GroupName, H}]);
        GroupName == 'Group B' -> 
        io:format("~w~n", [{'C002', GroupName, H}]);
        GroupName == 'Group C' ->
        io:format("~w~n", [{'C015', GroupName, H}]);
        GroupName == 'Group D' ->
        io:format("~w~n", [{'C016', GroupName, H}])
    end,
    P ! done.

len(L) -> 
    count(L, 0).

count([], Acc) -> Acc;

count([_|T], Acc) -> count(T, Acc + 1).

它按顺序打印组。

2> ppp:categorise(lists:seq(1,8)).
{'C003','Group A',1}
{'C003','Group A',2}
{'C002','Group B',3}
{'C002','Group B',4}
{'C015','Group C',5}
{'C015','Group C',6}
{'C016','Group D',7}
{'C016','Group D',8}
{done}

在产生cat 进程之间添加timer:sleep(100) 使用延迟。但是你有点放松并发的性质。所以如果你想按顺序打印消息,你必须想出一个不同的算法。

-module(ppp).

-compile([export_all]).

categorise(L) ->

     Size = len(L) div 4,
     Rem = len(L) rem 4,    
     spawn(ppp, cat, [self(), 'Group A', L, 0, Size + Rem]),
     timer:sleep(100),
     spawn(ppp, cat, [self(), 'Group B', L, (Size + Rem), Size]),
     timer:sleep(100),
     spawn(ppp, cat, [self(), 'Group C', L, (2*Size + Rem), Size]),
     timer:sleep(100),
     spawn(ppp, cat, [self(), 'Group D', L, (3*Size  + Rem), Size]), 
     wait(4). 

wait(0) -> {done};
wait(N) ->
receive
    done -> wait(N-1)
after 2000 ->
     ok
end.

cat(P, Name, L, Start, Elements) ->
     % io:format("cat: ~p L: ~p~n", [P,L]),     
     Extract = lists:split(Start, L),   
     Group = element(2, Extract),   
     AGroup = lists:sublist(Group, Elements),
     % io:format("AGroup: ~p~n", [AGroup]),
     Pid = spawn(fun putInRoom2/0),
     Pid ! {P, cat, {Name, AGroup}}.

putInRoom2() -> 
    receive
        {P, cat, {_GroupName, []}} -> P ! done;
        {P, cat, {GroupName, L}} ->
            F = fun(G, [H|T]) ->
                    if
                        GroupName == 'Group A' ->        
                            io:format("~w~n", [{'C003', GroupName, H}]);
                        GroupName == 'Group B' -> 
                            io:format("~w~n", [{'C002', GroupName, H}]);
                        GroupName == 'Group C' ->
                            io:format("~w~n", [{'C015', GroupName, H}]);
                        GroupName == 'Group D' ->
                            io:format("~w~n", [{'C016', GroupName, H}])
                    end,
                    self() ! {P, cat, {G, T}}
                end,
            F(GroupName, L),
            putInRoom2()
    end.

len(L) -> 
    count(L, 0).

count([], Acc) -> Acc;

count([_|T], Acc) -> count(T, Acc + 1).

【讨论】:

  • 谢谢,但为什么这个解决方案会按顺序打印它们?并且不会为每个元素创建一个进程会导致更大的列表产生性能开销吗?
  • 这是按顺序打印的,因为它按顺序生成进程。在您的代码中,它实际上按列打印[1,2,3],[4,5,6] 意味着您得到1,4,2,5,3,6。不知道为什么。也许一些调度程序的事情?关于性能,在当前状态下,putInRoom 函数的寿命很短。所以我想这不会那么重。不过,您可以衡量性能。
  • 顺序不是您所期望的,因为您的产卵是异步的。意思是,你生成 cat[1,2][3,4] 作为数据,putInRoom 显示 1,但是到了递归并且由于代码是异步的,第二个生成执行打印 3 和同样地。您可以看到添加延迟后打印顺序发生了变化。基本上你想把它想象成生成进程和同步消息传递。
  • 谁能提供更好的答案。我不介意投票,但我也在努力学习。 :)
猜你喜欢
  • 2022-01-15
  • 2012-02-11
  • 2013-11-18
  • 1970-01-01
  • 2017-05-11
  • 1970-01-01
  • 2017-10-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多