【发布时间】:2018-06-29 17:06:06
【问题描述】:
我有一个流产生数据的速度不如消耗的快。
所以我有一个这样定义的生产者:
def start_link() do
create_stream
|> GenStage.from_enumerable(name: Producer)
end
然后我的生产者-消费者订阅它
def init(:ok) do
{:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
end
我的消费者订阅了 mu producer-consumer
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
end
我遇到的问题是消费者挂起,我认为是因为生产者在某些时候没有设法获取新数据,并且如文档中所述:
当可枚举完成或停止时,阶段将退出 : 正常原因。这意味着,如果消费者订阅 可枚举阶段和:取消选项设置为:永久,即 默认情况下,消费者也会退出:正常原因
所以我阅读了更多内容,并建议添加选项cancel:: transient 以不完成该阶段。我是这样添加的,但它不起作用,我错过了什么吗?
|> GenStage.from_enumerable(name: Producer, cancel: :transient)
最初我使用的是Flow.into_stages(flow, [ProducerConsumer]),但我不能这样做,因为我无法从我的主管树中引用(或者我不知道如何)ProducerConsumer
children = [
{Producer, []},
{ProducerConsumer, []},
{Consumer, []}
]
更新
从子定义更新对 Flow.into_stages 的传递引用
children = [
{Producer, [name: ProducerConsumer]},
{ProducerConsumer, []},
{Consumer, []}
]
def start_link(producer_consumer) do
create_stream
|> Flow.into_stages(producer_consumer)
end
** (Mix) 无法启动应用程序测试:Application.start(:normal, []) 返回错误:关闭: 未能启动子项:制片人 ** (EXIT) 退出:GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID, [cancel: :transient]}, 5000) ** (EXIT) 与 Elixir.ProducerConsumer 没有连接
【问题讨论】:
-
不会
{ProducerConsumer, [[name: ProducerConsumer]]}(在指定子项时)帮助按名称引用它ProducerConsumer? -
mm 然后用 Flow.into_stages 来用?
-
例如,看起来应该可以解决问题。
-
不一定是{Producer, [[name: ProducerConsumer]]}?
-
嗯。你想参考什么?不管它是什么,给它一个名字。