【问题标题】:GenStage: How to handle situations where producer cannot provide events?GenStage:如何处理生产者无法提供事件的情况?
【发布时间】:2017-02-13 14:54:40
【问题描述】:

以下场景:GenStage 生产者处理 Twitter 流(使用 Stream APIExTwitter)并向 GenStage 消费者提供一组推文(消费者要求的最大需求)。然后消费者只需打印它们。

以下问题:我正在寻找特定的推文,因此并不总是有新的推文可用。如果 GenStage 生产者返回一个空的事件列表,消费者将停止询问。请参阅this issue and José Valims reply 了解更多信息。

我不确定如何解决这个问题。任何帮助是极大的赞赏。这是我目前所拥有的:

defmodule MyApp.TwitterProducer do
  use GenStage
  alias MyApp.TwitterStream

  def start_link(:ok) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    # This creates a regular Elixir Stream
    # I use this as the state so that not every
    # time the consumer asks for new data
    # a new stream is initiated
    stream = TwitterStream.get_stream
    {:producer, stream}
  end

  def handle_demand(demand, stream) do
    # Take tweets from the stream and 
    # turn them into a list. Then return 
    # them to the consumer
    chunk = Stream.take(stream, demand)
    events = Enum.to_list(chunk)
    {:noreply, events, stream}
  end


  def handle_info(_msg, state) do
    # I as getting an "wrong message" error 
    # before I implemented this function myself
    # It does nothing special to my case
    {:noreply, [], state}
  end

end

defmodule MyApp.TwitterConsumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do

    Process.sleep(3000)
    IO.inspect(events)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end

end

# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)

会发生什么:这会运行一段时间,然后停止。据我了解,一旦生产者返回一个空的事件列表。

编辑: 有趣的是:如果我将需求设置为 1,它会继续运行。但它比直接查询 Twitter Stream API 慢得多。这意味着我收到的推文少了十倍。我的理论是,这是由于重复的Stream.take 调用,而不是仅仅为整个流调用Enum.to_list。但我发现它仍然非常混乱。有什么我想念的想法吗?

【问题讨论】:

  • 创建一个特殊的返回列表来表明没有找到推文怎么样?
  • 这里的问题是:流是懒惰的。我看不到检查是否存在元素然后返回特殊列表的方法。例如,如果我调用 Enum.count/1 ,代码会一直等到实际存在元素。这将导致超时。
  • 好的,如果它停止,只需重新启动它。
  • @OnorioCatenacci 我有一种强烈的感觉,这不是正确的方法。我不想让它在不了解发生了什么的情况下以某种方式运行。我想了解这种行为并以正确和预期的方式解决它。
  • 我认为你没有抓住重点。据我所知,重新启动它是“正确和预期的方式”。谷歌“Erlang Way”和“Let It Crash”哲学,你会明白我的意思。

标签: elixir genstage


【解决方案1】:

GenStage.handle_demand/2 上的文档中有一个重要的(但不幸的是没有用粗体表示)句子:

生产者必须存储需求或返回请求的事件

也就是说,不是在Stream.take 上阻塞,而是可能会明确意识到任务可能正在阻塞并处理该案例,在这种情况下使用Task.await/2 以合理的超时时间收集需求(也许Task.yield/2 可以可用于更复杂的检查,但在这里似乎有点过头了。)

来自文档:

如果您不希望任务失败,那么您必须更改heavy_fun/0 代码,就像您没有异步调用时实现它一样。例如,要么返回{:ok, val} | :error 结果,或者在更极端的情况下,使用try/rescue

但是,文档缺少示例。 OTOH,在这里返回空列表并忘记收集需求可能会更容易:

def handle_demand(demand, stream) do
  try do
    task = Task.async(fn ->
      stream
      |> Stream.take(demand)
      |> Enum.to_list()
    end)
    Task.await(task, 1000) # one sec
  catch
    :exit, {:timeout, {Task, :await, [_, 1000]}} ->
      {:noreply, [], stream}
  else
    events when is_list(events) ->
      {:noreply, events, stream}
  end
end

【讨论】:

  • 问题,如果正在消费这个生产者的生产者-消费者收到一个空列表作为事件会发生什么?它也会挂起吗?
  • @lapinkoira 我不知道;我想是的,但我从来没有遇到过这种情况。
猜你喜欢
  • 2023-03-15
  • 1970-01-01
  • 1970-01-01
  • 2017-09-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-07-14
  • 1970-01-01
相关资源
最近更新 更多