【问题标题】:GenStage.from_enumerable hangs with an intermitent streamGenStage.from_enumerable 与间歇性流挂起
【发布时间】:2018-06-29 17:06:06
【问题描述】:

我有一个流产生数据的速度不如消耗的快。

所以我有一个这样定义的生产者:

def start_link() do
  create_stream
  |> GenStage.from_enumerable(name: Producer)
end

然后我的生产者-消费者订阅它

  def init(:ok) do
    {:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
  end

我的消费者订阅了 mu producer-consumer

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
  end

我遇到的问题是消费者挂起,我认为是因为生产者在某些时候没有设法获取新数据,并且如文档中所述:

当可枚举完成或停止时,阶段将退出 : 正常原因。这意味着,如果消费者订阅 可枚举阶段和:取消选项设置为:永久,即 默认情况下,消费者也会退出:正常原因

所以我阅读了更多内容,并建议添加选项cancel:: transient 以不完成该阶段。我是这样添加的,但它不起作用,我错过了什么吗?

|> GenStage.from_enumerable(name: Producer, cancel: :transient)

最初我使用的是Flow.into_stages(flow, [ProducerConsumer]),但我不能这样做,因为我无法从我的主管树中引用(或者我不知道如何)ProducerConsumer

children = [
  {Producer, []},
  {ProducerConsumer, []},
  {Consumer, []}
]

更新

从子定义更新对 Flow.into_stages 的传递引用

children = [
  {Producer, [name: ProducerConsumer]},
  {ProducerConsumer, []},
  {Consumer, []}
]

def start_link(producer_consumer) do
  create_stream
  |> Flow.into_stages(producer_consumer)
end

** (Mix) 无法启动应用程序测试:Application.start(:normal, []) 返回错误:关闭: 未能启动子项:制片人 ** (EXIT) 退出:GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID, [cancel: :transient]}, 5000) ** (EXIT) 与 Elixir.ProducerConsumer 没有连接

【问题讨论】:

  • 不会{ProducerConsumer, [[name: ProducerConsumer]]}(在指定子项时)帮助按名称引用它ProducerConsumer
  • mm 然后用 Flow.into_stages 来用?
  • 例如,看起来应该可以解决问题。
  • 不一定是{Producer, [[name: ProducerConsumer]]}?
  • 嗯。你想参考什么?不管它是什么,给它一个名字。

标签: elixir genstage


【解决方案1】:

错误:

** (Mix) 无法启动应用程序测试:Application.start(:normal, []) 返回错误:shutdown: failed to start child: Producer ** (EXIT) 退出:GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID, [cancel: :transient]}, 5000) ** (EXIT) 与 Elixir.ProducerConsumer 没有连接

只是意味着当 Flow.into_stages 尝试同步到提供的消费者时,消费者必须已经在运行。

所以,监督的时候顺序很重要,像这样:

children = [
  Consumer,
  FlowProducerWorker # worker which implements Flow.into_stages(flow, [Consumer])
]

【讨论】:

    猜你喜欢
    • 2012-03-23
    • 1970-01-01
    • 1970-01-01
    • 2015-05-21
    • 2016-09-14
    • 2014-05-25
    • 1970-01-01
    • 2016-10-22
    相关资源
    最近更新 更多