【发布时间】:2022-01-02 12:17:26
【问题描述】:
我正在尝试创建一种方法来处理 eventthub 中的事件峰值。我当前的 poc 解决方案只是在消耗事件时触发并忘记任务,而不是等待它们,然后使用信号量限制并行任务量以避免资源匮乏。
节流的实用程序:
public class ThrottledParallelTaskFactory
{
...
public Task StartNew(Func<Task> func)
{
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);
_ = Task.Run(func)
.ContinueWith(t =>
{
if (t.Status is TaskStatus.Faulted or TaskStatus.Canceled or TaskStatus.RanToCompletion)
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
}
if (t.Status is TaskStatus.Canceled or TaskStatus.Faulted)
{
_logger?.LogError(t.Exception, "Parallel task failed");
}
});
return Task.CompletedTask;
}
}
我的EventProcessorClient.ProcessEventAsync 代表:
private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}
在运行此设置一段时间后,我注意到当我配置的限制为 15 时,我的节流器的信号量最大可以并行运行 2-3 个任务。这种建议我的处理程序需要 333-500 毫秒才能完成,但处理程序内部的秒表表示整个处理程序需要 0 毫秒才能执行。我后来添加了处理程序开始/结束时间的时间戳记录以确认它,它确实需要 0-1ms,但它们之间有一个神秘的 300-600ms 间隔。 注意:对于当前的测试,此客户端正在处理数百万个事件的积压,它不处理实时数据,这可能会导致事件之间出现类似的延迟。
是否有机会EventProcessorClient 在每个事件之后内部检查点? 300-500ms 在我脑海中似乎很大。
我都使用了默认的缓存事件/预取计数和增加的计数,没有太大区别。
编辑:
它最终不是与实施相关的网络问题
【问题讨论】:
-
处理器本身具有高度的并发性,因为每个拥有的分区都被封装在一个专门的后台任务中,处理器的管理也是如此。其中每一个都是高度异步的工作流。您看到延迟的最可能原因是线程池中的争用,导致继续和新任务的调度速度很慢。作为一般规则,我们建议您在进行并发工作以处理事件时,在拥有的分区和系统中的处理器数量之间不超过 1:1 的关系进行测试。
-
从 EventHub 的角度来看 - 由于一些内部决策,目前只有一个分区,所以从技术上讲只有 1 个分区和 1 个处理器。另外,我忘记提到的关键细节只有在部署到 k8s 集群而不是本地时才会发生。但正如在我的编辑中 - 发现恰好与实施完全无关的问题
标签: c# azure azure-eventhub