【问题标题】:Start fetch, queue intermediate requests during fetch, then serve data for all开始 fetch,在 fetch 期间对中间请求进行排队,然后为所有人提供数据
【发布时间】:2017-05-21 00:03:10
【问题描述】:

我在使用 Elixir 和 Phoenix 实现以下流程时遇到问题:

  1. 来自用户 A 的请求,第 3 方 API 缓存为空
  2. 启动第 3 方通过 HTTP 获取 API
    1. 获取尚未完成,来自用户 B 的请求进来
    2. 用户 B 等待完成提取
  3. 获取完成,将获取的数据写入缓存(例如 Redis)
  4. 为所有等待的用户提供缓存数据

不同的路由或路由参数应该使用不同的队列。在第 3 方 API 数据仍在获取时传入的请求在任何情况下都不应触发具有相同参数的额外获取。等待部分(2.2.)对我来说至关重要。

从我目前阅读的内容来看,这个问题似乎可以使用标准 Elixir / Erlang / OTP 功能解决。

【问题讨论】:

    标签: elixir phoenix-framework erlang-otp


    【解决方案1】:

    是的,与大多数其他语言相比,这在 Elixir/Erlang 中可以很容易地完成。这是使用内存缓存执行此操作的一种方法。如果您以前使用过 GenServer 而不是 GenServer.reply/2,这里需要注意的是,我们存储传入的 handle_call 请求的 from 参数,当请求完成时,我们会响应每个请求。我没有在这个 POC 代码中很好地处理错误,但它正确处理了最有趣的部分,即 2.2:

    defmodule CachedParallelHTTP do
      def start_link do
        GenServer.start_link(__MODULE__, :ok)
      end
    
      def init(_) do
        {:ok, %{}}
      end
    
      def handle_call({:fetch, arg}, from, state) do
        case state[arg] do
          %{status: :fetched, response: response} ->
            # We've already made this request; just return the cached response.
            {:reply, response, state}
          %{status: :fetching} ->
            # We're currently running this request. Store the `from` and reply to the caller later.
            state = update_in(state, [arg, :froms], fn froms -> [from | froms] end)
            {:noreply, state}
          nil ->
            # This is a brand new request. Let's create the new state and start the request.
            pid = self()
            state = Map.put(state, arg, %{status: :fetching, froms: [from]})
            Task.start(fn ->
              IO.inspect {:making_request, arg}
              # Simulate a long synchronous piece of code. The actual HTTP call should be made here.
              Process.sleep(2000)
              # dummy response
              response = arg <> arg <> arg
              # Let the server know that this request is done so it can reply to all the `froms`,
              # including the ones that were added while this request was being executed.
              GenServer.call(pid, {:fetched, arg, response})
            end)
            {:noreply, state}
        end
      end
    
      def handle_call({:fetched, arg, response}, _from, state) do
        # A request was completed.
        case state[arg] do
          %{status: :fetching, froms: froms} ->
            IO.inspect "notifying #{length(froms)} clients waiting for #{arg}"
            # Reply to all the callers who've been waiting for this request.
            for from <- froms do
              GenServer.reply(from, response)
            end
            # Cache the response in the state, for future callers.
            state = Map.put(state, arg, %{status: :fetched, response: response})
            {:reply, :ok, state}
        end
      end
    end
    

    这里有一小段代码来测试这个:

    now = fn -> DateTime.utc_now |> DateTime.to_iso8601 end
    
    {:ok, s} = CachedParallelHTTP.start_link
    IO.inspect {:before_request, now.()}
    for i <- 1..3 do
      Task.start(fn ->
        response = GenServer.call(s, {:fetch, "123"})
        IO.inspect {:response, "123", i, now.(), response}
      end)
    end
    :timer.sleep(1000)
    for i <- 1..5 do
      Task.start(fn ->
        response = GenServer.call(s, {:fetch, "456"})
        IO.inspect {:response, "456", i, now.(), response}
      end)
    end
    IO.inspect {:after_request, now.()}
    :timer.sleep(10000)
    

    输出:

    {:before_request, "2017-01-06T10:30:07.852986Z"}
    {:making_request, "123"}
    {:after_request, "2017-01-06T10:30:08.862425Z"}
    {:making_request, "456"}
    "notifying 3 clients waiting for 123"
    {:response, "123", 3, "2017-01-06T10:30:07.860758Z", "123123123"}
    {:response, "123", 2, "2017-01-06T10:30:07.860747Z", "123123123"}
    {:response, "123", 1, "2017-01-06T10:30:07.860721Z", "123123123"}
    "notifying 5 clients waiting for 456"
    {:response, "456", 5, "2017-01-06T10:30:08.862556Z", "456456456"}
    {:response, "456", 4, "2017-01-06T10:30:08.862540Z", "456456456"}
    {:response, "456", 3, "2017-01-06T10:30:08.862524Z", "456456456"}
    {:response, "456", 2, "2017-01-06T10:30:08.862504Z", "456456456"}
    {:response, "456", 1, "2017-01-06T10:30:08.862472Z", "456456456"}
    

    请注意,使用 GenServer.replyTask.start,单个 GenServer 能够处理多个并行请求,同时保持面向用户的 API 完全同步。根据您要处理的负载量,您可能需要考虑使用 GenServer 池。

    【讨论】:

    • 感谢您的详细回复!这肯定需要处理很多(注意:我刚刚开始使用 Elixir)。将CachedParallelHTTP 作为主管工作人员是否有意义?
    • 啊,我建议先阅读官方的“混合和 OTP”指南,然后再尝试理解我在这种情况下的答案:elixir-lang.org/getting-started/mix-otp/…
    • “作为主管工作人员启动 CachedParallelHTTP 有意义吗?”是的,我肯定会这样做,以便在崩溃时重新启动。
    • 这个答案看起来像是要走的路。您还可以将问题分解为 2 个单独的 GenServers - 一个用于处理缓存,与此处的 CachedParallelHTTP 几乎相同,另一个用于获取 API。您将失去使用 Tasks 获得的固有并行性,但您可以轻松实现节流。
    • 既然GenServer.call(pid, {:fetched, arg, response})在一个Task里面,它不会回复同一个进程而不是原来的调用者吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-07
    • 2019-11-17
    • 1970-01-01
    • 2021-03-04
    • 2017-11-27
    • 1970-01-01
    相关资源
    最近更新 更多