【问题标题】:How to reduce frequency of continuously fired event's event handling如何减少连续触发事件的事件处理频率
【发布时间】:2020-07-25 13:39:49
【问题描述】:

我正在学习 c# 中的任务和async/await。所以请考虑我的问题的愚蠢性。

班级中有一个事件DummyEvent。一个事件处理器DummyEventHandler订阅了这个event,它处理了大量的CPU密集型任务,实际上不需要这么频繁地使用。

因此,如果DummyEvent 被连续触发,我希望DummyEventHandler 以降低的频率响应,或者在连续性结束时响应。

所以,我的想法是将大型任务提取到一个单独的任务中,并使其在继续之前延迟 500 毫秒。延迟结束后,会检查同一个Task是否再次​​被调度(连续事件触发),如果为true则避免大计算。

这是我对这个想法的幼稚实现:

int ReducedCall = 0;
int TotalCallActual = 0;

protected void DummyEventHandler(object sender, bool arg)
{
    TotalCallActual++;
    LargeCPUBoundTask(); // there is a green underline here, but I think it's ok, or.. is it?
}

async Task LargeCPUBoundTask()
{
    ReducedCall = TotalCallActual;

    await Task.Delay(500);
    // if this task is called again in this time, TotalCallActual will increase

    if (ReducedCall == TotalCallActual)
    {
        // do all the large tasks
        ……

        ReducedCall = 0;
        TotalCallActual = 0;
    }
}

但问题是,我没有得到我想要的。 Task.Delay(500) 行实际上并没有 await ,或者,如果它确实在等待,那就是有问题,因为我经历了惊人的。

有什么更好的想法,或者有什么改进/更正吗?

询问任何其他信息。

谢谢

【问题讨论】:

  • 您在谈论MyTask 下的绿线,但您没有在代码中包含任何以这种方式调用的方法。 TotalCallActualReducedCall 究竟是如何以及何时增加/减少的?以及如何创建一个我们可以实际测试的代码?
  • @PeterBons 抱歉回复晚了,我已经更新了我的问题,MyTask 实际上是LargeCPUBoundTask,这是在简化问题时出错。每次调用 DummyEventHandler 时,TotalCallActual 都会增加,行:TotalCallActual++; 并在事件处理程序中减少。
  • 没关系。我已经更新了答案
  • 自从发布我的答案后,我创建了一个GitHub repository,其中包含测试和示例控制台应用程序的解决方案。它也可以通过 NuGet 获得(搜索 CVV.EventReducer)。

标签: c# async-await task-parallel-library


【解决方案1】:

您可以利用Reactive Extensions 执行此操作:

void Main()
{
    var generator = new EventGenerator();
    var observable = Observable.FromEventPattern<EventHandler<bool>, bool>(
                h => generator.MyEvent += h,
                h => generator.MyEvent -= h);

    observable
        .Throttle(TimeSpan.FromSeconds(1))
        .Subscribe(s =>
        {
            Console.WriteLine("doing something");
        });

    // simulate rapid firing event
    for(int i = 0; i <= 100; i++)
        generator.RaiseEvent(); 

    // when no longer interested, dispose the subscription  
    subscription.Dispose(); 
}

public class EventGenerator
{
    public event EventHandler<bool> MyEvent;

    public void RaiseEvent()
    {
        if (MyEvent != null)
        {
            MyEvent(this, false);
        }
    }
}

上面编码的Throttle 运算符将允许值(事件)每秒变为真。

所以在上面的代码示例中,文本 doing something 只会打印一次(在一秒钟后),即使事件被触发了很多次。

编辑
顺便说一句,绿线的原因是您的任务没有等待。要修复它,请将代码更改为:

protected async void DummyEventHandler(object sender, bool arg)
{
    TotalCallActual++;
    await LargeCPUBoundTask(); // there is no more green underline here
}

不幸的是,这仍然无法解决您的问题,因为无法等待事件,因此如果在 LargeCPUBoundTask 仍在运行另一个对 LargeCPUBoundTask 的调用时再次引发事件,那么如果您得到我的内容,工作就会重叠意思是。换句话说,这就是你的代码不起作用的原因。

【讨论】:

    【解决方案2】:

    我会使用计时器事件处理程序而不是您的 DummyEventHandler 只需以毫秒为单位调整计时器的频率即可。您可以通过代码创建计时器,而无需将其作为控件添加到表单中。我认为它在公共控件库中。

    希望这会有所帮助。祝你好运。

    【讨论】:

      【解决方案3】:

      我花了一些时间来思考这个问题,我对第一个解决方案所做的假设是事件是连续触发的,而它可能只是在一段时间内触发部分时间,然后在 真正的问题。

      在这种情况下,CPU 绑定任务只会在第一次触发事件时发生,然后如果事件在该 CPU 绑定任务完成之前完成触发,则不会处理剩余的事件。但是您不想处理所有这些,只需处理“最后一个”(不一定是实际的最后一个,只需再处理一个来处理“清理”)。

      所以我更新了我的答案,以包含频繁但间歇性(即突发事件然后安静)的用例,正确的事情会发生并且会发生 CPU 绑定任务的最终运行(但仍然没有更多一次运行超过 1 个 CPU 密集型任务)。

      using System;
      using System.Threading;
      using System.Threading.Tasks;
      
      class Program
      {
          static void Main(string[] args)
          {
              Sender s = new Sender();
              using (Listener l = new Listener(s))
              {
                  s.BeginDemonstration();
              }
          }
      }
      
      class Sender
      {
          const int ATTEMPTED_CALLS = 1000000;
      
          internal EventHandler frequencyReducedHandler;
          internal int actualCalls = 0;
          internal int ignoredCalls = 0;
      
          Task[] tasks = new Task[ATTEMPTED_CALLS];
      
          internal void BeginDemonstration()
          {
              int attemptedCalls;
              for (attemptedCalls = 0; attemptedCalls < ATTEMPTED_CALLS; attemptedCalls++)
              {
                  tasks[attemptedCalls] = Task.Run(() => frequencyReducedHandler.Invoke(this, EventArgs.Empty));
                  //frequencyReducedHandler?.BeginInvoke(this, EventArgs.Empty, null, null);
              }
              if (tasks[0] != null)
              {
                  Task.WaitAll(tasks, Timeout.Infinite);
              }
              Console.WriteLine($"Attempted: {attemptedCalls}\tActual: {actualCalls}\tIgnored: {ignoredCalls}");
              Console.ReadKey();
          }
      }
      
      class Listener : IDisposable
      {
          enum State
          {
              Waiting,
              Running,
              Queued
          }
      
          private readonly AutoResetEvent m_SingleEntry = new AutoResetEvent(true);
          private readonly Sender m_Sender;
      
          private int m_CurrentState = (int)State.Waiting;
      
          internal Listener(Sender sender)
          {
              m_Sender = sender;
              m_Sender.frequencyReducedHandler += Handler;
          }
      
          private async void Handler(object sender, EventArgs args)
          {
              int state = Interlocked.Increment(ref m_CurrentState);
              try
              {
                  if (state <= (int)State.Queued) // Previous state was WAITING or RUNNING
                  {
                      // Ensure only one run at a time
                      m_SingleEntry.WaitOne();
                      try
                      {
                          // Only one thread at a time here so
                          // no need for Interlocked.Increment
                          m_Sender.actualCalls++;
                          // Execute CPU intensive task
                          await Task.Delay(500);
                      }
                      finally
                      {
                          // Allow a waiting thread to proceed
                          m_SingleEntry.Set();
                      }
                  }
                  else
                  {
                      Interlocked.Increment(ref m_Sender.ignoredCalls);
                  }
              }
              finally
              {
                  Interlocked.Decrement(ref m_CurrentState);
              }
          }
      
          public void Dispose()
          {
              m_SingleEntry?.Dispose();
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-01-08
        • 2017-06-18
        • 2021-04-07
        相关资源
        最近更新 更多