【问题标题】:Runtime-dynamic compute graph using Elixir Genstage使用 Elixir Genstage 的运行时动态计算图
【发布时间】:2019-03-28 20:45:46
【问题描述】:

我希望能够在运行时动态更改计算管道,但似乎 GenStage 需要在编译时通过 subscribe_to: [...] 机制定义计算图。有没有办法创建动态计算图?例如在下面,我想在运行时在我的管道图中的“减 7”和“减 4”顶点之间切换。

这可以使用 GenStage 吗?我可能会有非常复杂的管道,所以我需要一个能够以复杂方式扩展以更改图形的解决方案,而不是临时解决方案,例如在这种情况下,参数化要减去的整数。我希望能够添加或删除整个子树、在子树之间切换以及将节点添加到图中,包括将它们拼接到包括主树在内的任何子树的中间。

请参阅下面的编辑

这是最初的生产者:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

这里是 producer_consumers 之一:

defmodule GenstageTest.PcTimesFive do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 * 5))
    {:noreply, numbers, state}
  end
end

这是最终消费者:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

我 都是仿照Elixir School Genstage tutorial.

所有模块和 mix.exs 都可以是found on github

在@AquarHEAD L 部分回答后 3 天后编辑。

我已经设法让运行时订阅正常工作。这里分别修改了一些producer、producer_consumers和consumers:

制作人:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end

  def handle_info({:doprint}, state) do
    IO.puts "yep"
    {:noreply, [], state}
  end

  def handle_info({:cancel, sublink}, state) do
    GenStage.cancel sublink, []
    {:noreply, [], state}
  end

end

生产者_消费者:

defmodule GenstageTest.PcAddOne do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 + 1))
    {:noreply, numbers, state}
  end
end

消费者:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect event
      #File.write("/home/tbrowne/scratch/output.txt", 
      #  Kernel.inspect(event) <> " ", [:append])
      :timer.sleep(100)
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

现在一旦这些都在 lib 目录中可用(记得将 {:gen_stage, "~&gt; 0.11"} 添加到您的 mix.exs deps 中),或者复制并粘贴到 IEX 中,那么以下将完美运行:

{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)

现在的问题是,我仍然不知道如何取消订阅。有一个cancel function,也有一个stop function。例如GenStage.stop(c) 似乎什么也没做,而我在GenStage.cancel/3 的各种尝试只会出错。

回顾一下,我现在需要的是能够停止某些阶段并用其他阶段替换它们。取消订阅的语法是什么,从哪里调用它?文档中没有很好地解释,因为没有具体的例子。

【问题讨论】:

  • 似乎他们在此示例中使用 dynamicSupervisor 对 pr consumer 进行了动态切换:medium.com/@andreichernykh/…。也许你可以得到灵感,在你的生产者消费者问题上做同样的事情?我会尽快尝试,但不会是今天。
  • 问题是在什么情况下你想切换一个阶段?取决于管道中的数据还是外部处理?
  • @Hauleth 的想法是用户将能够使用不同的管道动态地“假设”,因此,管道更改将是外生事件,即:外部事件。事实上,它们很可能来自类似于 lunalang luna-lang.org 的界面

标签: erlang elixir genstage


【解决方案1】:

为什么不实现自己的GenStage.Dispatcher?这里是behaviour

【讨论】:

    【解决方案2】:

    您绝对可以在运行时更改管道checkout the first example in GenStage documentationyou can also use the :manual mode to fine control the demand。还有API to cancel subscription。我认为这些足以动态管理 GenStage 管道。

    【讨论】:

    • 我已经很好地完成了这项工作。很遗憾 Genstate 上的 Elixir School 的东西没有提到这种模式。不过我有一个问题,取消是如何工作的?它是必须在生产者/生产者消费者内部发生的事情吗? IE 必须在 producter_consumer 中定义和调用吗?或者可以从外部调用 GenStage.cancel 吗?我在所有排列上都遇到错误。你能举例说明如何使用取消吗?我已经能够成功使用的所有其他东西。
    • 我无法取消工作。你知道它是如何工作的吗?
    • @ThomasBrowne 抱歉忙于其他事情。 Elixir 不是面向对象的,所以如果我正确地理解了你的意思,就没有“从内部定义和调用......”......
    • @ThomasBrowne(在我完成之前意外发送了评论......)您可以从文档中看到 GenStage.cancel 接受“from”类型作为第一个参数,单击它由 pid 组成和一个标签,后者可以从 GenStage.sync_subscribe 获得。从外观上看,pid 应该是生产者的。希望这会有所帮助。
    • 是的,我得到了这个工作。但是,我手动修改缓冲区长度以使它们立即停止。不过,我还有一个问题。当我使用cancel 时,它会停止订阅,但也会关闭与之关联的消费者。为什么这样做?为什么不停止订阅,然后让我从同一个正在运行的消费者那里重新订阅其他内容。
    猜你喜欢
    • 2019-01-11
    • 2023-03-15
    • 1970-01-01
    • 2017-10-07
    • 2023-03-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多