【问题标题】:Keep chunked connection open before Elixir processes finished在 Elixir 进程完成之前保持分块连接打开
【发布时间】:2016-09-01 16:56:22
【问题描述】:

应用程序工作简单。当它接收到请求时,它会生成 thread_listener 并将其循环 10 次并传递索引 (i)。 ndc_thread 获取此数据 (i) 并返回核心。 Core 正在循环等待来自线程的消息,当它接收到时,它会发送包含线程返回的消息的块。

问题是在执行 1..10 循环函数后,它会发送“End”块并关闭连接。 所以 curl http://localhost:4000 的输出是:“StartEnd” 期望的结果是:“Start12345678910End”

有没有办法保持连接打开并等待自定义超时或等待进程执行?

defmodule AlivePlug do
  import Plug.Conn

  def init(opts) do
    opts
  end

  def call(conn, _opts) do
    conn = send_chunked(conn, 200)
    chunk(conn, "Start")

    core_pid = spawn_link(fn -> core_listener(conn) end)
    thread = spawn_link(fn -> thread_listener end)
    1..10 |> Enum.each(fn i -> send thread, {core_pid, i} end)

    chunk(conn, "End")
    conn
  end

  defp core_listener(conn) do
    receive do
      {status, i} ->
        _conn = chunk(conn, Integer.to_string(i))
        core_listener(conn)
    end
  end

  defp thread_listener do
    receive do
      {core_pid, i} ->
        send core_pid, {:ok, i}
        thread_listener
      _ ->
        thread_listener
    end
  end
end

这是有效的应用程序 只需运行并使用邮递员或 curl http://localhost:4000 https://github.com/programisti/keep-alive-elixir

【问题讨论】:

  • @Atomic_alarm 是否有关于 Elixir/Erlang 标签未配对的元决策?我只知道 Erlang,但这个问题所需的代码/知识在这里可以轻松互换,因此标记两者似乎是合理的。
  • @NathanielWaisbrot,我不确定。我不知道 Elixir,也无法理解问题的本质。由于问题的措辞只是 Elixir 而不是 Elixir/Erlang 我认为这个问题与 Erlang 无关。如果您愿意,可以在聊天中继续对话 - chat.stackoverflow.com/rooms/75358/erlang-otp 或原样返回 ;-)

标签: multithreading elixir


【解决方案1】:

好吧,问题在于您生成了thread 进程,向它发送了一些数据,但永远不要等待它有机会工作,然后保持连接的函数结束。

让我们从一个非常幼稚的解决方案开始:

    1..10 |> Enum.each(fn i -> send thread, {core_pid, i} end)
    Process.sleep 5000
    chunk(conn, "End")

如果我们睡 5 秒,那应该足够 thread 完成所有工作了。这是一个糟糕的解决方案,因为如果您循环到 10000,那么 5 秒可能太短,而 1..10 5 秒可能太长了。此外,你不应该相信盲目的时机。 理论上系统可能会一次等待 30 秒,另一次等待 10 毫秒以在 CPU 上调度 thread

现在让我们做一个真正的解决方案,基于进程邮箱是一个先进先出的事实。

首先,我们将添加thread_listener 可以理解的另一种消息类型:

  defp thread_listener do
    receive do
      {core_pid, i} ->
        send core_pid, {:ok, i}
        thread_listener
      {:please_ack, pid} ->
        send pid :ack
        thread_listener
      _ ->
        thread_listener
    end
  end

接下来,让我们用更智能的东西替换 Process.sleep

    1..10 |> Enum.each(fn i -> send thread, {core_pid, i} end)
    send thread, {:please_ack, self}
    receive do
      :ack ->
        ok
    end
    chunk(conn, "End")

现在我们将第 11 条消息放入thread 的邮箱,然后等待确认。处理完所有 11 条消息后,我们将收到一条消息并能够继续。 thread 是否在发送消息的那一刻处理每条消息,或者在消息开始之前是否有 10 秒的暂停,这并不重要。 (这些数字是双曲线的,但我想强调的是,在多线程编程中不能依赖时序。)

您现在可能会注意到,我们实现的内容实际上并没有以有用的方式使用流程。您从主线程生成两个线程,它们都处于空闲状态,直到您的主线程向它们发送数据,然后您的 main 线程处于空闲状态,直到生成的线程完成。只在主线程中完成工作会更清楚。这个例子太琐碎了,实际上不需要线程,我不知道你打算如何扩展它,但我希望这能给你一些关于线程编程的一般帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-25
    • 1970-01-01
    • 2019-06-06
    • 2014-07-05
    相关资源
    最近更新 更多