是的,与大多数其他语言相比,这在 Elixir/Erlang 中可以很容易地完成。这是使用内存缓存执行此操作的一种方法。如果您以前使用过 GenServer 而不是 GenServer.reply/2,这里需要注意的是,我们存储传入的 handle_call 请求的 from 参数,当请求完成时,我们会响应每个请求。我没有在这个 POC 代码中很好地处理错误,但它正确处理了最有趣的部分,即 2.2:
defmodule CachedParallelHTTP do
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(_) do
{:ok, %{}}
end
def handle_call({:fetch, arg}, from, state) do
case state[arg] do
%{status: :fetched, response: response} ->
# We've already made this request; just return the cached response.
{:reply, response, state}
%{status: :fetching} ->
# We're currently running this request. Store the `from` and reply to the caller later.
state = update_in(state, [arg, :froms], fn froms -> [from | froms] end)
{:noreply, state}
nil ->
# This is a brand new request. Let's create the new state and start the request.
pid = self()
state = Map.put(state, arg, %{status: :fetching, froms: [from]})
Task.start(fn ->
IO.inspect {:making_request, arg}
# Simulate a long synchronous piece of code. The actual HTTP call should be made here.
Process.sleep(2000)
# dummy response
response = arg <> arg <> arg
# Let the server know that this request is done so it can reply to all the `froms`,
# including the ones that were added while this request was being executed.
GenServer.call(pid, {:fetched, arg, response})
end)
{:noreply, state}
end
end
def handle_call({:fetched, arg, response}, _from, state) do
# A request was completed.
case state[arg] do
%{status: :fetching, froms: froms} ->
IO.inspect "notifying #{length(froms)} clients waiting for #{arg}"
# Reply to all the callers who've been waiting for this request.
for from <- froms do
GenServer.reply(from, response)
end
# Cache the response in the state, for future callers.
state = Map.put(state, arg, %{status: :fetched, response: response})
{:reply, :ok, state}
end
end
end
这里有一小段代码来测试这个:
now = fn -> DateTime.utc_now |> DateTime.to_iso8601 end
{:ok, s} = CachedParallelHTTP.start_link
IO.inspect {:before_request, now.()}
for i <- 1..3 do
Task.start(fn ->
response = GenServer.call(s, {:fetch, "123"})
IO.inspect {:response, "123", i, now.(), response}
end)
end
:timer.sleep(1000)
for i <- 1..5 do
Task.start(fn ->
response = GenServer.call(s, {:fetch, "456"})
IO.inspect {:response, "456", i, now.(), response}
end)
end
IO.inspect {:after_request, now.()}
:timer.sleep(10000)
输出:
{:before_request, "2017-01-06T10:30:07.852986Z"}
{:making_request, "123"}
{:after_request, "2017-01-06T10:30:08.862425Z"}
{:making_request, "456"}
"notifying 3 clients waiting for 123"
{:response, "123", 3, "2017-01-06T10:30:07.860758Z", "123123123"}
{:response, "123", 2, "2017-01-06T10:30:07.860747Z", "123123123"}
{:response, "123", 1, "2017-01-06T10:30:07.860721Z", "123123123"}
"notifying 5 clients waiting for 456"
{:response, "456", 5, "2017-01-06T10:30:08.862556Z", "456456456"}
{:response, "456", 4, "2017-01-06T10:30:08.862540Z", "456456456"}
{:response, "456", 3, "2017-01-06T10:30:08.862524Z", "456456456"}
{:response, "456", 2, "2017-01-06T10:30:08.862504Z", "456456456"}
{:response, "456", 1, "2017-01-06T10:30:08.862472Z", "456456456"}
请注意,使用 GenServer.reply 和 Task.start,单个 GenServer 能够处理多个并行请求,同时保持面向用户的 API 完全同步。根据您要处理的负载量,您可能需要考虑使用 GenServer 池。