【问题标题】:EventProcessorClient delays between eventsEventProcessorClient 事件之间的延迟
【发布时间】:2022-01-02 12:17:26
【问题描述】:

我正在尝试创建一种方法来处理 eventthub 中的事件峰值。我当前的 poc 解决方案只是在消耗事件时触发并忘记任务,而不是等待它们,然后使用信号量限制并行任务量以避免资源匮乏。

节流的实用程序:

    public class ThrottledParallelTaskFactory
    {
        ...

        public Task StartNew(Func<Task> func)
        {
            _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}",  _semaphore.CurrentCount, _limit);
            _semaphoreSlim.Wait(_timeout);  
            
            _ = Task.Run(func)
                .ContinueWith(t =>
                {
                    if (t.Status is TaskStatus.Faulted or TaskStatus.Canceled or TaskStatus.RanToCompletion)
                    {
                        _semaphoreSlim.Release();
                        _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
                    }
                    if (t.Status is TaskStatus.Canceled or TaskStatus.Faulted)
                    {
                        _logger?.LogError(t.Exception, "Parallel task failed");
                    }
                });
            return Task.CompletedTask;
        }
    }

我的EventProcessorClient.ProcessEventAsync 代表:

 private Task ProcessEvent(ProcessEventArgs arg)
        {
            var sw = Stopwatch.StartNew();
            try
            {
                _throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
            }
            catch (Exception e)
            {
                _logger.LogError(e, "Failed to process event");
            }
            _logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
            return Task.CompletedTask;
        }

在运行此设置一段时间后,我注意到当我配置的限制为 15 时,我的节流器的信号量最大可以并行运行 2-3 个任务。这种建议我的处理程序需要 333-500 毫秒才能完成,但处理程序内部的秒表表示整个处理程序需要 0 毫秒才能执行。我后来添加了处理程序开始/结束时间的时间戳记录以确认它,它确实需要 0-1ms,但它们之间有一个神秘的 300-600ms 间隔。 注意:对于当前的测试,此客户端正在处理数百万个事件的积压,它不处理实时数据,这可能会导致事件之间出现类似的延迟。

是否有机会EventProcessorClient 在每个事件之后内部检查点? 300-500ms 在我脑海中似乎很大。 我都使用了默认的缓存事件/预取计数和增加的计数,没有太大区别。

编辑:

它最终不是与实施相关的网络问题

【问题讨论】:

  • 处理器本身具有高度的并发性,因为每个拥有的分区都被封装在一个专门的后台任务中,处理器的管理也是如此。其中每一个都是高度异步的工作流。您看到延迟的最可能原因是线程池中的争用,导致继续和新任务的调度速度很慢。作为一般规则,我们建议您在进行并发工作以处理事件时,在拥有的分区和系统中的处理器数量之间不超过 1:1 的关系进行测试。
  • 从 EventHub 的角度来看 - 由于一些内部决策,目前只有一个分区,所以从技术上讲只有 1 个分区和 1 个处理器。另外,我忘记提到的关键细节只有在部署到 k8s 集群而不是本地时才会发生。但正如在我的编辑中 - 发现恰好与实施完全无关的问题

标签: c# azure azure-eventhub


【解决方案1】:

你没有测量正确的东西,基本上你使用的是异步/等待和任务错误。

        private Task ProcessEvent(ProcessEventArgs arg)
        {
            var sw = Stopwatch.StartNew();
            try
            {
                _throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
            }
            catch (Exception e)
            {
                _logger.LogError(e, "Failed to process event");
            }
            _logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
            return Task.CompletedTask;
        }

在上面的代码中,没有等待对_throttledParallelTaskFactory.StartNew 的调用。所以秒表没有什么可测量的。此外,由于不等待调用,因此不会捕获任何异常。

您应该将异常处理和时间测量移至StartNew 方法,如下所示:

        private Task ProcessEvent(ProcessEventArgs arg)
        {
            _throttledParallelTaskFactory.StartNew(() => Task.Delay(1000));
            
            return Task.CompletedTask;
        }
public class ThrottledParallelTaskFactory
{
    public async Task StartNew(Func<Task> func)
    {
        var sw = Stopwatch.StartNew();

        _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
        _semaphoreSlim.Wait(_timeout);
        
        try
        {
            await func.Invoke();
        }
        catch
        {
            _logger.LogError(e, "Failed to process event");
            _logger?.LogError(t.Exception, "Parallel task failed");
        }
        finally
        {
            _semaphoreSlim.Release();
            _logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
            _logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
        }
    }
}

看看我们是如何摆脱对ContinueWith 的调用的?此外,由于 func 已经表示 Task,因此无需将代码包装在对 Task.Run 的调用中。

EventProcessorClient 是否会在每个事件后内部检查点?

不,它没有。您必须手动进行检查点。

【讨论】:

  • 关于任务的东西你说得对,我养成了只将来自不同来源的小块代码放入这个科学怪人的怪物的坏习惯。 (秒表只是为了快速检查是否没有发生占用那些神秘的 300 毫秒的“魔法”)。但是结果仍然是相同的 - 事件处理程序调用之间存在 300-600 毫秒的间隔。
  • 我想我的选择可能是切换到EventHubConsumerClient,因为我没有遇到并行化问题,而是自己做检查点
  • @Edgar.A:您能否阐明您正在考虑为EventHubConsumerClient 使用的方法?
  • 对不起,这更像是一个淋浴的想法,不应该当真
猜你喜欢
  • 2013-09-20
  • 1970-01-01
  • 1970-01-01
  • 2012-11-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-06
相关资源
最近更新 更多