【发布时间】:2017-02-13 14:54:40
【问题描述】:
以下场景:GenStage 生产者处理 Twitter 流(使用 Stream API 和 ExTwitter)并向 GenStage 消费者提供一组推文(消费者要求的最大需求)。然后消费者只需打印它们。
以下问题:我正在寻找特定的推文,因此并不总是有新的推文可用。如果 GenStage 生产者返回一个空的事件列表,消费者将停止询问。请参阅this issue and José Valims reply 了解更多信息。
我不确定如何解决这个问题。任何帮助是极大的赞赏。这是我目前所拥有的:
defmodule MyApp.TwitterProducer do
use GenStage
alias MyApp.TwitterStream
def start_link(:ok) do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# This creates a regular Elixir Stream
# I use this as the state so that not every
# time the consumer asks for new data
# a new stream is initiated
stream = TwitterStream.get_stream
{:producer, stream}
end
def handle_demand(demand, stream) do
# Take tweets from the stream and
# turn them into a list. Then return
# them to the consumer
chunk = Stream.take(stream, demand)
events = Enum.to_list(chunk)
{:noreply, events, stream}
end
def handle_info(_msg, state) do
# I as getting an "wrong message" error
# before I implemented this function myself
# It does nothing special to my case
{:noreply, [], state}
end
end
defmodule MyApp.TwitterConsumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
Process.sleep(3000)
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)
会发生什么:这会运行一段时间,然后停止。据我了解,一旦生产者返回一个空的事件列表。
编辑: 有趣的是:如果我将需求设置为 1,它会继续运行。但它比直接查询 Twitter Stream API 慢得多。这意味着我收到的推文少了十倍。我的理论是,这是由于重复的Stream.take 调用,而不是仅仅为整个流调用Enum.to_list。但我发现它仍然非常混乱。有什么我想念的想法吗?
【问题讨论】:
-
创建一个特殊的返回列表来表明没有找到推文怎么样?
-
这里的问题是:流是懒惰的。我看不到检查是否存在元素然后返回特殊列表的方法。例如,如果我调用 Enum.count/1 ,代码会一直等到实际存在元素。这将导致超时。
-
好的,如果它停止,只需重新启动它。
-
@OnorioCatenacci 我有一种强烈的感觉,这不是正确的方法。我不想让它在不了解发生了什么的情况下以某种方式运行。我想了解这种行为并以正确和预期的方式解决它。
-
我认为你没有抓住重点。据我所知,重新启动它是“正确和预期的方式”。谷歌“Erlang Way”和“Let It Crash”哲学,你会明白我的意思。