【问题标题】:Elixir process limit?Elixir 进程限制?
【发布时间】:2017-12-07 23:02:59
【问题描述】:

我想创建一个类似这样的 Elixir 代码:

def infinite_loop(created_workers \\ []) do
  case next_from_queue do
    {:ok, queue_msg} ->
      new_worker = Task.async(fn -> crawling(queue_msg) end)
      infinite_loop([new_worker | created_workers])
    {:error, :empty} ->
      created_workers.map(&Task.await/1)
  end
end

假设:

  1. crawling 函数将创建另一个 3 Task
  2. 每个crawling worker 可以花 3 秒时间跑步
  3. queue 可能有数百万条消息

我如何知道 Elixir 上并行进程的限制是多少?我怎样才能让它不坏?

【问题讨论】:

  • 您为什么不利用Genstage 做这个?它提供了一种背压机制,消费者只会在生产者可用时要求生产者提供更多任务。您可以对其进行配置,以便在需要完成更多工作时动态生成消费者。通过这种方式,您可以最大限度地利用您的资源而不会爆炸。
  • @KevinJohnson GenStage 看起来棒极了!!!但是,如果我没有弄错,为了“消费”我的生产者(队列)得到的东西,我需要“初始化”一个生产者。所以,我希望你能帮助我解决以下问题: - 我是否需要初始化多个生产者以达到更多并行性? - 我可以动态地做到这一点吗?我的意思是,当我在消费者身上有更多“等待”的工作时,我会创造更多的生产者......提前致谢!

标签: elixir phoenix-framework


【解决方案1】:

我建议为此使用Task.async_streamTask.async_stream 允许您并行处理流,同时限制并行运行的任务数量。虽然在 Erlang 20 中进程数的默认限制是 262144,但如果您正在抓取一个网站,您可能需要一个更低的限制。

您可以使用 Stream.iterate 从不断返回新项目的函数创建流:

stream =
  Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
  |> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)

由于您想在{:error, :empty} 停止,我们使用Stream.take_while 停止流。

然后像这样使用Task.async_stream

stream
|> Task.async_stream(fn {:ok, queue_msg} ->
  crawling(queue_msg)
end, max_concurrency: 16)

这将运行最多 16 个并行任务的流。最终结果将是crawling(queue_msg) 的所有返回值的列表。

【讨论】:

  • 感谢@Dogbert 的精彩回复!!!我真的很喜欢你的方法。但是,我想要一个长生不老药架构,其中代码可以爆炸 100% 的 CPU 和内存。你认为这是一个很好的方法 spawns 有很多进程然后捕获“限制达到异常”?
  • 不幸的是@AugustoPedraza,当达到 BEAM 虚拟机可以处理的限制时,不会出现“引发异常”的情况。它只会崩溃。
猜你喜欢
  • 2019-06-28
  • 2011-05-31
  • 2017-10-16
  • 2019-06-15
  • 2014-07-25
  • 1970-01-01
  • 2018-07-17
  • 2016-03-14
  • 1970-01-01
相关资源
最近更新 更多