【问题标题】:how to make a genserver to run with a frequency value Elixir如何使 genserver 以频率值运行 Elixir
【发布时间】:2021-03-16 18:52:48
【问题描述】:

我见过很多 GenServer 实现,我正在尝试创建一个具有此类规范的实现,但我不确定它的 GenServer 用例。

我有这样的状态

%{url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10}

我有这样 100 个状态,具有不同的值,我的用例包含 5 个步骤。

  1. 将每个状态作为 Gen{?} 开始。
  2. 向该 URL 发送 HTTP 请求。
  3. 获取结果。
  4. 发送另一个 HTTP 请求,数据来自第一个请求。
  5. 使进程进入睡眠状态。如果频率为 10,则持续 10 秒,依此类推,10 秒后将再次从 1 步开始。

现在,当我启动 100 个这样的 worker 时,将会有 100 * 2 个频繁的 HTTP 请求。我不确定我会使用 GenServer 还是 GenStage 或 Flow 甚至是 Broadway?

我还担心 HTTP 请求不会崩溃,例如一个有状态的工作人员会发送一个请求,如果频率是在第一个请求返回之前 1 秒,另一个请求会被发送, GenServer 有足够的能力处理这些情况吗?我认为这叫做背压?

我一直在询问和研究这个用例这么久,我的用例也被引导到 RabbitMQ。

任何指导都会很有帮助,或者任何最小的例子都会非常感激。

? GenServer/ GenStage / GenStateMachine

【问题讨论】:

  • 那肯定不是Broadway 也不是Flow。我在这里也没有看到GenStage 的任何应用程序。不久前,我遇到了类似的任务,我已经发布了 Tarearbol 库,它正是这样做的(除了其他事情之外),一个实现了一种行为,而后面的 DynamicSupervisor 完成了其余的工作。
  • 是否可以启动一个gen server,然后让它在做请求的时候绕一圈移动,保存它,然后再次发送它,然后让自己进入睡眠状态,过一会儿再启动。如果一个请求没有成功,也可以处理其他请求。
  • @JunaidFarooq 可以做类似的事情,如this article 中所述。

标签: erlang elixir gen-server genstage


【解决方案1】:

您的问题归结为减少给定时间的并发网络请求数。

一个简单的方法是拥有一个跟踪传出请求计数的 GenServer。然后,对于每个客户(在您的情况下最多 200 个),它可以检查是否有打开的请求,然后采取相应的行动。下面是服务器的样子:

defmodule Throttler do
  use GenServer

  #server
  @impl true
  def init(max_concurrent: max_concurrent) do
    {:ok, %{count: 0, max_concurrent: max_concurrent}}
  end

  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}


  @impl true
  def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end

好的,现在我们有一个服务器,我们可以在其中调用handle_call(pid, :run),它会告诉我们是否超出了计数。一旦任务(获取 URL)完成,我们需要调用 handle_call(pid, :finished) 让服务器知道我们已经完成了任务。

在客户端,我们可以将其包装在一个方便的辅助函数中。 (请注意,这仍在 Throttler 模块中,因此 __MODULE__ 有效)

defmodule Throttler do
  #client
  def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
    GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
  end
  
  def execute_async(pid, func) do
    GenServer.call(pid, :run)
    |> case do
      :ok ->
        task = Task.async(fn -> 
          try do
            func.()
          after
            GenServer.call(pid, :finished)
          end
        end)
        {:ok, task}
      {:error, reason} -> {:error, reason, func}
    end
  end
end

这里我们传入一个我们希望在客户端异步执行的函数,并在执行之前在服务器端完成调用 :run 和 :finished 的工作。如果成功,我们将返回一个任务,否则我们将失败。

把它们放在一起,你会得到如下所示的代码:

{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num -> 
  Throttler.execute(pid, fn ->
    IO.puts("Running command #{num}")
    :timer.sleep(:5000)
    IO.puts("Sleep complete for #{num}")
    num * 10
  end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))

现在你有一堆任务要么成功,要么失败,你可以采取适当的行动。

失败后你会怎么做?这是背压的有趣部分 :) 最简单的事情是超时并重试,假设您最终会清除下游的压力。否则,您可能会完全失败请求并继续将问题推送到上游。

【讨论】:

    猜你喜欢
    • 2018-12-14
    • 1970-01-01
    • 2016-06-23
    • 2018-01-13
    • 2017-10-16
    • 2019-01-02
    • 2017-10-29
    • 2021-03-03
    • 2016-03-14
    相关资源
    最近更新 更多