【问题标题】:Data streaming using streamcontent_from_pid in Yaws/Erlang在 Yaws/Erlang 中使用 streamcontent_from_pid 进行数据流式传输
【发布时间】:2016-10-03 10:57:28
【问题描述】:

我希望通过yaws 将数据流式传输到我的彗星应用程序,我已经阅读并努力理解它,但来自 yaws 的示例对我来说似乎有点复杂(我是 Erlang 的新手)。我就是想不通……

这里是 yaws 的例子(我稍微修改了一下):

out(A) ->
    %% Create a random number
    {_A1, A2, A3} = now(),
    random:seed(erlang:phash(node(), 1),
                erlang:phash(A2, A3),
                A3),
    Sz = random:uniform(1),

    Pid = spawn(fun() ->
                        %% Read random junk
                        S="Hello World",
                        P = open_port({spawn, S}, [binary,stream, eof]),
                        rec_loop(A#arg.clisock, P)
                end),

    [{header, {content_length, Sz}},
     {streamcontent_from_pid, "text/html; charset=utf-8", Pid}].


rec_loop(Sock, P) ->
    receive
        {discard, YawsPid} ->
            yaws_api:stream_process_end(Sock, YawsPid);
        {ok, YawsPid} ->
            rec_loop(Sock, YawsPid, P)
    end,
    port_close(P),
    exit(normal).

rec_loop(Sock, YawsPid, P) ->
    receive
        {P, {data, BinData}} ->
            yaws_api:stream_process_deliver(Sock, BinData),
            rec_loop(Sock, YawsPid, P);
        {P, eof} ->
            yaws_api:stream_process_end(Sock, YawsPid)
    end.

我需要将上面的脚本转换为可以与以下组合的脚本。

mysql:start_link(p1, "127.0.0.1", "root", "azzkikr", "mydb"),
                {data, Results}  = mysql:fetch(p1, "SELECT*FROM messages WHERE id > " ++ LASTID),
                {mysql_result, FieldNames, FieldValues, NoneA, NoneB} = Results,
                parse_data(FieldValues, [], [], [], [], [])

parse_data(FieldValues, [], [], [], [], []) 返回条目的 JSON 字符串。 合并此脚本应不断检查数据库中的新条目,如果有,它应该像彗星一样获取。

谢谢你们,愿你们都去天堂!

【问题讨论】:

  • 你是想通过comet收到请求后查询数据库,还是循环查询数据库,一旦有需要的数据就通过comet响应?
  • 我想循环查询数据库,并在有所需数据时立即通过彗星响应。

标签: erlang comet yaws data-stream


【解决方案1】:

正如this answer 解释的那样,有时您需要运行一个独立于任何传入 HTTP 请求的进程。对于您的情况,您可以使用发布/订阅的形式:

  • Publisher:当你的 Erlang 节点启动时,启动某种数据库客户端进程,或这样的进程池,执行你的查询并独立于 Yaws 运行。
  • 订阅者: 当 Yaws 接收到 HTTP 请求并将其分派给您的代码时,您的代码将订阅发布者。当发布者向订阅者发送数据时,订阅者会将数据流式传输回 HTTP 客户端。

在这里详细说明完整的解决方案是不切实际的,但一般步骤是:

  • 当您的数据库客户端进程启动时,它们会将自己注册到pg2 组或类似的组中。使用poolboy 之类的东西,而不是滚动您自己的进程池,因为它们是notoriously tricky 才能正确。每个数据库客户端都可以是 gen_server 的一个实例,它运行查询、接收数据库结果以及处理订阅请求调用。
  • 当您的 Yaws 代码收到请求时,它会查找数据库客户端发布者进程并订阅它。订阅需要调用数据库客户端模块中的函数,该函数又使用gen_server:call/2,3 与实际的gen_server 发布者进程进行通信。订阅者使用Yaws streaming capabilities(或SSEWebSocket)完成与HTTP 客户端的连接并向其发送任何所需的响应标头。
  • 发布者存储订阅者的进程 ID,并在订阅者上建立monitor,以便在订阅者死亡或意外退出时清理订阅。
  • 发布者在其发送给订阅者的消息中使用monitor's reference 作为唯一ID,因此订阅函数将该引用返回给订阅者。订阅者使用引用来匹配来自发布者的传入消息。
  • 当发布者从数据库中获得新的查询结果时,它会将数据发送给它的每个订阅者。这可以通过普通的 Erlang 消息来完成。
  • 订阅者使用Yaws streaming functions(或SSEWebSocket功能)将查询结果发送到HTTP客户端。
  • 当 HTTP 客户端断开连接时,订阅者会调用另一个发布者函数来取消订阅。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-10-10
    • 2013-05-02
    • 2015-12-30
    • 2014-05-09
    • 2011-12-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多