【问题标题】:How to broadcast a message to a list of processes in Erlang? Console hanging如何将消息广播到 Erlang 中的进程列表?控制台挂
【发布时间】:2014-11-29 21:45:45
【问题描述】:

我是 Erlang 的新手,我正在尝试了解如何将消息从一个进程发送到进程列表。

假设我们有一个数据结构,其中包含一个包含字符串和 Pid 元素的列表。如何让 Pid 向作为前面描述的两个元素之一的 Pid 发送消息“M”? 我想出的是:

broadcast(P, M, R) ->
  P ! {self(), friends},
  receive
    {P, Friends} ->
  P ! {self(), {send_message, {M, R, P, Friends}}}
  end.

looper({Name, Friends, Messages}) ->
receive
  {From, friends} ->
    From ! {self(), Friends},
    looper({Name, Friends, Messages});
  {From, {send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}} ->
    if R =< 0 ->
          From ! {From, {self(), {ID, M}}},
          looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
       R > 0  andalso FriendTale =/= []->
         FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
         looper({Name, FriendTale, [{ID, M} | Messages]})
    end;
   terminate ->
    ok
end.

但据我了解,我没有正确匹配 Pid 列表的模式,因此我可以从 Pid 列表的元素中“提取” Pid,或者我没有正确使用列表发送给它留言。

基本上,我有一个名为“looper”的函数,它一直在等待新消息的到达。当它收到类型为

的消息时
{send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}

其中“M”是我要广播到名为“Friends”的 Pid 列表的消息,而 R 只是一个整数。

R 基本上是一个整数,表示消息应该走多远。

e.g. 0 = broadcast the message to self,
     1 = broadcast the message to the friends of the pid,
     2 = broadcast the message to the friends of the friends of the pid and so on...

设置 Pid,设置 Pid 之间的“友谊”并广播消息后,我从终端得到的是:

1> f().
ok
2> c(facein).
facein.erl:72: Warning: variable 'From' is unused
{ok,facein}
3> {Message, Pid} = facein:start({"dummy", [], []}).
{ok,<0.186.0>}
4> {Message, Pid2} = facein:start({"dummy2", [], []}).
{ok,<0.188.0>}
5> facein:add_friend(Pid,Pid2).
ok
6> facein:broadcast(Pid,"hello",1).    
=ERROR REPORT==== 5-Oct-2014::12:12:58 ===
Error in process <0.186.0> with exit value: {if_clause,[{facein,looper,1,[{file,"facein.erl"},{line,74}]}]}

{<0.177.0>,{send_message,{"hello",1,#Ref<0.0.0.914>}}}

当我查看广播消息的 Pid 的消息时,控制台只是挂起,其他 Pid 没有收到任何消息。

任何帮助将不胜感激。 谢谢

【问题讨论】:

  • 这里不够详细,looper 函数甚至不是有效的 erlang(缺少接收)。
  • 我在 looper 中添加了接收(我的错)。您还需要什么其他信息?也许我可以给你

标签: erlang erlang-shell


【解决方案1】:

错误信息

这次你得到的是if_clause 错误。在 Erlang 中,每个表达式都必须返回一些值,包括 if。这意味着您可以编写这样的代码

SomeVar = if 
     R =< 0 ->
       [...]
     R > 0 andalso FriendTale =/= []->
       [...]
  end

正如您所看到的,是否需要“返回”某些东西,并且要做到这一点,它的一个分支需要运行。或者换句话说,它的一个条款需要马赫。但在你的情况下,当R &gt; 0FriendsTale =:= [] 没有它们时。因此出现运行时错误。

作为一般实践的最后一个子句保留为

  _ ->  
     [...]

这将始终匹配,并让您避免此类错误。

在您的示例中,您根本不必使用if。你可以做的是用一些守卫来扩展你的接收子句

looper({Name, Friends, Messages}) ->
  receive
    {From, {send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}} 
       when R =< 0 ->
          From ! {From, {self(), {ID, M}}},
          looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
    {From, {send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}}  
       when R > 0  andalso FriendTale =/= [] ->
          FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
          looper({Name, FriendTale, [{ID, M} | Messages]});
    terminate ->
          ok
  end.

如果收到receive 消息,则不必匹配一个子句。如果没有,它只是留在消息框中(在此接收中被忽略,但可能被另一个捕获)。

或者放弃你的逻辑,你可以在 R 本身上进行模式匹配

looper({Name, Friends, Messages}) ->
  receive
    {From, {send_message, {M, 0, ID, [{FriendPid, FriendName} | FriendTale]}}} ->
          From ! {From, {self(), {ID, M}}},
          looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
    {From, {send_message, {M, 1, ID, [{FriendPid, FriendName} | FriendTale]}}}  
       when FriendTale =/= [] ->
          FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
          looper({Name, FriendTale, [{ID, M} | Messages]});
    terminate ->
          ok
  end.

为了提高就绪性,您可以将 R 从 opque integer 更改为 Miningfull atom

looper({Name, Friends, Messages}) ->
  receive
    {From, {send_message, {M, back_to_me, ID, [{FriendPid, FriendName} | FriendTale]}}} ->
          From ! {From, {self(), {ID, M}}},
          looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});

    {From, {send_message, {M, to_friends, ID, [{FriendPid, FriendName} | FriendTale]}}}  
       when FriendTale =/= [] ->
          FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
          looper({Name, FriendTale, [{ID, M} | Messages]});

    terminate ->
          ok
  end.

向朋友广播

如果我理解正确looper 是代表一个“人”的函数。每个朋友都是存储朋友列表的进程,可以添加和删除,还可以向其他朋友发送消息。

让我们从为每个函数创建子句开始(创建过程接口)

looper(Name, Friends, Messages) ->
  receive 
     {add_friend, Friend} ->
        [...];
     {remove_friend, Friend} ->
        [...];
     {receive_message, Message} ->
        [...];frineds
     {broadcast_to_self, Message} ->
        [...];
     {broadcast_to_friends, Message} ->
        [...];
     terminate ->
        ok
  end

其中大部分都很容易实现,例如

{add_frined, Friend} ->
    looper(Name, [Friend, Friends], Messages);

所以我就不详细说了。

进行广播的那些不会改变状态,所以现在让我们写这样的东西(主要是为了可读性

     {broadcast_to_friends, Message} ->
        handle_broadcast_to_friends(Friends, Message),
        looper(Name, Friends, Messages);

并在下面实现新功能

handle_broadcast_to_friends(Friends, Message) ->
   [ F ! {receive_message, Message} || F <- Friends ].

现在,由于确切知道要发送哪个原子的元组并不方便,我们可以将“消息接口”包装到“函数接口”中。例如

add_friend(Person,  Friend) ->
   Person ! {add_friend, Friends}.

receive_message(Person, Message) ->
    Person ! {receive_message, Message}.

我们也可以在您的逻辑实现中使用它们

handle_broadcast_to_friends(Friends, Message) ->
   [ receive_message(F, Message)  || F <- Friends ].

这应该会让你走上正轨。如果您需要MessageID 或类似的东西,只需扩展您的界面即可。如果你真的需要创建broadcast_to_all,你需要想一想你会如何处理循环的消息,这不是一个简单的问题。

【讨论】:

  • 但是我的语法对于尝试将消息传递给进程列表是否正确?
  • 我在接收块中添加了另一个模式,因为再次看到错误报告和它提到的第 74 行是这一行:From ! {self(), Friends},
【解决方案2】:

我建议您首先将您正在做的事情的复杂性降低到基本要素。例如,您接收中的这种条件处理业务不是您的基本消息传递问题的一部分。这是一个常见广播习语的基本示例,使用列表推导发送到函数 bcast/2 中的 pid 列表:

-module(bcast).
-export([start/0]).

start() ->
    Pids = [spawn(fun() -> listener() end) || _ <- lists:seq(1,3)],
    Message = "This is my message.",
    ok = bcast(Pids, Message),
    timer:sleep(100), % Give the subordinates time to act.
    [exit(P, kill) || P <- Pids],
    ok.

listener() ->
    receive
        {bcast, Message} ->
            Now = now(),
            io:format(user, "~p ~p: Received: ~p~n", [self(), now(), Message]),
            listener();
        Any ->
            io:format(user, "~p HURR! Unexpected message: ~p~n", [self(), Any]),
            listener()
    end.

bcast(Pids, Message) ->
    BCast = fun(Pid) -> send(Pid, {bcast, Message}) end,
    lists:foreach(BCast, Pids).

您在代码中遇到的其他问题并不是真正的广播问题,而是用不熟悉的语言超越自己的问题。

这个例子是异步的(我们只是以一种方式发送消息)并且需要我们杀死我们的从属进程,因为我只写了一个无限循环。这些方面是首先要解决的问题:如何处理您的消息队列(如整个邮箱),而不仅仅是发送消息,循环接收消息;并想一想当事情进展顺利时您希望您的下属进程如何死亡(我只是在上面的示例中将它们全部杀死)。

编辑 很高兴知道还有另一种比较常见的方法来编写上面的bcast/2 函数,使用分配给“我不在乎”变量_ 的列表推导:

bcast(Pids, Message) ->
    _ = [P ! {bcast, Message} || P <- Pids],
    ok.

我以前更喜欢这种列表理解风格,因为这是我第一次接触到的风格——但在过去的几年里,我已经成为 lists:foreach/2 语义正确性的忠实粉丝。关键是当您看到列表推导时不要感到困惑——它是一种能够阅读的重要表达式类型!

【讨论】:

  • 这段代码似乎发送了 n 次消息而不是广播。
  • @user7860670 bcast/2 函数是找到实际广播发送的位置。 _ = [P ! {bcast, Message} || P &lt;- Pids], 行可以改写为BCast = fun(P) -&gt; send(P, {bcast, Message}) end, lists:foreach(BCast, Pids). 我会添加一个注释作为替代。
  • 无论哪种方式,消息都使用通常的发送表达式! 发送,因此不会发生广播。当我在寻找广播时,我希望找到一些方法,让我可以发送到多个进程,而不会增加每个额外收件人的演员表。
  • @user7860670 计算机中没有什么是即时的。只要你在一个单一的上下文中,一件事总是在另一件事之后发生。你期望会发生什么?正在执行正在运行的进程代码的调度程序进程将能够神奇地同时写入任意数量的内存地址?我们能做的最好的事情就是对你隐藏这个,但这很愚蠢。
  • 我希望发送广播消息将涉及创建单个消息,而不管收件人的数量如何(至少当他们驻留在同一操作系统进程中时)。这就是在适当的单作者多读者消息队列中发生的情况。然而,erlang 的实现似乎只涉及单写单读队列,所以广播是不可能的。即使是看似适合提供广播功能的pg 模块也明确声明“没有特殊功能可以向进程组发送消息。”
猜你喜欢
  • 2014-11-29
  • 1970-01-01
  • 2014-02-21
  • 1970-01-01
  • 2012-03-03
  • 2016-06-24
  • 2012-04-27
  • 2016-03-01
  • 1970-01-01
相关资源
最近更新 更多