【问题标题】:Debounce events with Elixir使用 Elixir 去抖动事件
【发布时间】:2016-05-11 14:53:44
【问题描述】:

我正在从 MQ 获取事件流到我的 Elixir 消费者。

在消费者中我需要:

  1. 按 ID 聚合事件和
  2. 如果 3 分钟内没有该 ID 的新数据,则向下游发送该 ID 的聚合数据。

就我而言,数据集并不大。每天可能有几百个 ID 和几千个更新。

有什么方法可以使用 GenServer 魔法解决这个问题?

谢谢!

【问题讨论】:

    标签: elixir gen-server


    【解决方案1】:

    我会这样做:

    每当有新事件发生时:

    • 如果它是第一个具有该 id 的事件,则使用 Process.send_after/3 创建一个超时时间为 3 分钟的计时器引用,并将事件和计时器存储在状态中。

    • 1234563旧事。

    并且在计时器触发的handle_info 中,将该id 的事件推送到下游并从状态中删除该条目。

    这是上面的一个简单实现:

    defmodule DebouncedEcho do
      @timeout 1000
    
      use GenServer
    
      def start_link do
        GenServer.start_link __MODULE__, []
      end
    
      def init(_) do
        {:ok, %{}}
      end
    
      def handle_cast({:store, id, event}, state) do
        case state[id] do
          nil ->
            timer = Process.send_after(self, {:timer, id}, @timeout)
            state = Map.put(state, id, %{events: [event], timer: timer})
            {:noreply, state}
          %{events: events, timer: timer} ->
            Process.cancel_timer(timer)
            timer = Process.send_after(self, {:timer, id}, @timeout)
            state = Map.put(state, id, %{events: [event | events], timer: timer})
            {:noreply, state}
        end
      end
    
      def handle_info({:timer, id}, state) do
        %{events: events} = state[id]
        IO.inspect {:flush, id, events}
        state = Map.delete(state, id)
        {:noreply, state}
      end
    end
    

    测试:

    {:ok, server} = DebouncedEcho.start_link
    GenServer.cast server, {:store, 1, :foo}
    GenServer.cast server, {:store, 1, :bar}
    GenServer.cast server, {:store, 2, :foo}
    :timer.sleep(500)
    GenServer.cast server, {:store, 2, :bar}
    :timer.sleep(500)
    GenServer.cast server, {:store, 2, :baz}
    :timer.sleep(500)
    GenServer.cast server, {:store, 1, :baz}
    :timer.sleep(2000)
    

    输出:

    {:flush, 1, [:bar, :foo]}
    {:flush, 2, [:baz, :bar, :foo]}
    {:flush, 1, [:baz]}
    

    【讨论】:

    • 优秀的答案!我还将超时保持在状态,以便可以将其传递给 init 函数。这可能会使测试更容易,因为您可以在单元测试中设置零超时。
    • 非常优雅的解决方案。谢谢!
    猜你喜欢
    • 2015-04-12
    • 1970-01-01
    • 2016-07-24
    • 1970-01-01
    • 2019-12-21
    • 1970-01-01
    • 2021-11-21
    • 1970-01-01
    相关资源
    最近更新 更多