【问题标题】:Sending parallel requests Erlang发送并行请求 Erlang
【发布时间】:2018-09-23 05:27:19
【问题描述】:

我正在用 Erlang 实现一个类似 Twitter 的应用程序。我有它的分布式和非分布式实现。我正在做一个基准测试,但似乎我找不到向每个用户进程发送并行请求以进行分布式实现的方法。我正在使用 lists:foreach 函数将“获取推文”发送到客户端进程列表。我的理解是 lists:foreach 函数一次进入列表的每个元素,实现了最终使我的分布式的顺序行为实现导致与非分布式实现相同的执行时间。是否可以一次将“获取推文”请求发送到不同的客户端进程?这对我来说似乎是一个相当具体的案例,在 StackOverflow 内外都很难找到解决方案。

test_get_tweets_Bench() ->
{ServerPid, UserInfos} = initializeForBench_server(),
run_benchmark("timeline",
    fun () ->
        lists:foreach(fun (_) ->
            UserChoice = pick_random(UserInfos),
            server:get_tweets(element(2, UserChoice), element(1, UserChoice), 1)
        end,
        lists:seq(1, 10000))
    end,
    30).

pick_random(List) ->
lists:nth(rand:uniform(length(List)), List).

userinfos 是以下形式的列表:[{userId,client_process},...]

在尝试 rpc:pmap 而不是 lists:foreach 之后,我的基准测试已经慢了大约 3 倍。变化如下:

test_get_tweets_Bench2() ->
{ServerPid, UserInfos} = initializeForBench_server(),
run_benchmark("get_tweets 2",
    fun () ->
        rpc:pmap({?MODULE,do_apply},
                 [fun (_) ->
            UserChoice = pick_random(UserInfos),
            server:get_tweets(element(2, UserChoice), element(1, UserChoice), 1)
        end],
                    lists:seq(1, 10000))
    end,
    30).


pick_random(List) ->
    lists:nth(rand:uniform(length(List)), List).

do_apply(X,F)->
    F(X).

我认为 rpc:pmap 会使我的基准测试更快,因为它会并行发送 get_tweet 请求。

下面是我的服务器模块,它是我的基准测试和类似 Twitter 的应用程序之间的 API。 API 将来自我的基准测试的请求发送到我的类似 Twitter 的应用程序。

    %% This module provides the protocol that is used to interact with an
%% implementation of a microblogging service.
%%
%% The interface is design to be synchrounous: it waits for the reply of the
%% system.
%%
%% This module defines the public API that is supposed to be used for
%% experiments. The semantics of the API here should remain unchanged.
-module(server).

-export([register_user/1,
         subscribe/3,
         get_timeline/3,
         get_tweets/3,
         tweet/3]).

%%
%% Server API
%%

% Register a new user. Returns its id and a pid that should be used for
% subsequent requests by this client.
-spec register_user(pid()) -> {integer(), pid()}.
register_user(ServerPid) ->
    ServerPid ! {self(), register_user},
    receive
        {ResponsePid, registered_user, UserId} -> {UserId, ResponsePid}
    end.

% Subscribe/follow another user.
-spec subscribe(pid(), integer(), integer()) -> ok.
subscribe(ServerPid, UserId, UserIdToSubscribeTo) ->
    ServerPid ! {self(), subscribe, UserId, UserIdToSubscribeTo},
    receive
        {_ResponsePid, subscribed, UserId, UserIdToSubscribeTo} -> ok
    end.

% Request a page of the timeline of a particular user.
% Request results can be 'paginated' to reduce the amount of data to be sent in
% a single response. This is up to the server.
-spec get_timeline(pid(), integer(), integer()) -> [{tweet, integer(), erlang:timestamp(), string()}].
get_timeline(ServerPid, UserId, Page) ->
    ServerPid ! {self(), get_timeline, UserId, Page},
    receive
        {_ResponsePid, timeline, UserId, Page, Timeline} ->
            Timeline
    end.

% Request a page of tweets of a particular user.
% Request results can be 'paginated' to reduce the amount of data to be sent in
% a single response. This is up to the server.
-spec get_tweets(pid(), integer(), integer()) -> [{tweet, integer(), erlang:timestamp(), string()}].
get_tweets(ServerPid, UserId, Page) ->
    ServerPid ! {self(), get_tweets, UserId, Page},
    receive
        {_ResponsePid, tweets, UserId, Page, Tweets} ->
            Tweets
    end.

% Submit a tweet for a user.
% (Authorization/security are not regarded in any way.)
-spec tweet(pid(), integer(), string()) -> erlang:timestamp(). 
tweet(ServerPid, UserId, Tweet) ->
    ServerPid ! {self(), tweet, UserId, Tweet},
    receive
        {_ResponsePid, tweet_accepted, UserId, Timestamp} ->
            Timestamp
    end.

【问题讨论】:

  • 我认为对于 Erlang 中的并行请求,您应该使用并行映射。您可以在这里查看 pmap:stackoverflow.com/questions/7595128/….
  • 在将我的 lists:foreach 更改为 rpc:pmap 之后,我的基准测试现在运行得比以前慢(大约慢 2 -3 倍)。我认为这种更改会加快速度,因为它将请求并行发送到分布式进程。

标签: concurrency parallel-processing erlang


【解决方案1】:

在 Erlang 中,消息从进程 A 交换到进程 B。没有可用的功能,如广播或选择性广播。在您的应用程序中,我看到 3 个步骤:

  1. 发送请求以获取用户的推文,
  2. 用户进程准备答案并将其发送回请求者
  3. 初始过程收集答案

将请求发送到用户进程并收集推文(步骤 1 和 3)不能使用并行性。当然,您可以使用多个进程来发送请求并收集答案,每个用户最多 1 个,但我想这不是您的问题的主题。

可行的是,保证这3个步骤不是针对每个用户进程按顺序进行,而是并行进行。我猜函数server:get_tweets 负责发送请求并收集答案。如果我是正确的(我不知道,因为您没有提供代码,并且您忽略了返回的值),您可以通过将此函数拆分为 2 来使用并行性,第一个发送请求,第二个收集答案。 (这里是一个代码示例,我没有尝试甚至编译,所以请谨慎考虑:o)

test_get_tweets_Bench() ->
{ServerPid, UserInfos} = initializeForBench_server(),
run_benchmark("timeline",
    fun () ->
        % send the requests
        List = lists:map(fun (_) ->
            {UserId,Pid} = pick_random(UserInfos),
            Ref = server:request_tweets(Pid,UserId),
            {Ref,UserId}
            end,
            lists:seq(1, 10000)),
        % collects the answers
        collect(L,[])
    end,
    30).

collect([],Result) -> {ok,Result};
collect(List,ResultSoFar) ->
    receive
        {Ref,UserId,Tweets} ->
            {ok,NewList} = remove_pending_request(Ref,UserId,List),
            collect(Newlist,[{UserId,Tweets}|ResultSoFar])
    after ?TIMEOUT
        {error,timeout,List,ResultSoFar}
    end.

remove_pending_request(Ref,UserId,List) ->
    {value,{Ref,UserId},NewList} = lists:keytake(Ref,1,List),
    {ok,NewList}. 

pick_random(List) ->
lists:nth(rand:uniform(length(List)), List).

这是我实施并行基准测试的另一次尝试,但并未实现任何加速。

get_tweets(Sender, UserId, Node) ->
server:get_tweets(Node, UserId, 0),
Sender ! done_get_tweets.

test_get_tweets3() ->
    {_ServerId, UserInfos} = initializeForBench_server(),
    run_benchmark("parallel get_tweet", 
        fun () ->
            lists:foreach(
                fun (_) ->
                    {UserId,Pid} = pick_random(UserInfos),
                    spawn(?MODULE, get_tweets, [self(), UserId, Pid])
                end,
                lists:seq(1, ?NUMBER_OF_REQUESTS)),
            lists:foreach(fun (_) -> receive done_get_tweets -> ok end end, lists:seq(1, ?NUMBER_OF_REQUESTS))
        end,
        ?RUNS).

【讨论】:

  • 我尝试了这个代码实现,它使基准测试比 rpc:pmap 运行得更快,但它仍然比 lists:foreach 实现慢(x2 慢)。我在当前的查询编辑中添加了我的服务器模块。有什么我可以更改的地方,以完全并行化请求并使其运行速度甚至比 list:foreach 实现更快吗?
  • 您的测试中有多少台服务器(对应于 Server Pide 变量)并行运行?
  • 我为每个服务器请求生成一个新进程。有趣的是,即使我这样做了,我也没有观察到加速。似乎我遗漏了有关 Erlang 工作原理的一些重要细节。看最后的代码sn -p。
  • 我会的,这需要一些时间,我目前没有互联网连接,除了我的手机 :o)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-05
  • 2020-11-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-05
相关资源
最近更新 更多