【发布时间】:2021-01-09 01:07:02
【问题描述】:
我有一个带有 ASP.Net Core 3.1 的 API,它使用 Azure.Messaging.EventHubs 和 Azure.Messaging.EventHubs.Processor 从消费者组获取事件,然后将它们发送到 SignalR 集线器。处理器仅在有用户连接到集线器时运行,并在最后一个断开连接时停止,更新其在 BlobStorage 中的检查点。
每个事件的当前处理逻辑:如果 DateTime.UtcNow 和事件时间戳之间的时间差异(以分钟为单位)小于 2,则将事件发送到 SignalR 集线器,仅此而已。
问题如下: 有时候EventProcessorClient停了很长一段时间,很多事件都保留在EventHub中,导致它等待的时间很长,慢慢赶上最近的事件直到 SignalR Hub 再次开始接收它们。处理器赶上最新事件的时间太长了,特别是考虑到每分钟接收数百个事件时。
有没有办法,例如,在启动处理器之前手动移动检查点?还是只获取最后 X 分钟的事件?也许是另一个想法/解决方案?
PS:我不关心这个消费者组超过 2 到 5 分钟的事件。
PS2:EventHub中配置的保留时间为1天。
代码:
/* properties and stuff */
// Constructor
public BusEventHub(ILogger<BusEventHub> logger, IConfiguration configuration, IHubContext<BusHub> hubContext) {
_logger = logger;
Configuration = configuration;
_busExcessHub = hubContext;
/* Connection strings and stuff */
// Create a blob container client that the event processor will use
storageClient = new BlobContainerClient(this.blobStorageConnectionString, this.blobContainerName);
// Create an event processor client to process events in the event hub
processor = new EventProcessorClient(storageClient, consumerGroup, this.ehubNamespaceConnectionString, this.eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
}
public async Task Start() {
_logger.LogInformation($"Starting event processing for EventHub {eventHubName}");
await processor.StartProcessingAsync();
}
public async Task Stop() {
if (BusHubUserHandler.ConnectedIds.Count < 2) {
_logger.LogInformation($"Stopping event processing for EventHub {eventHubName}");
await processor.StopProcessingAsync();
} else {
_logger.LogDebug("There are still other users connected");
}
}
private async Task ProcessEventHandler(ProcessEventArgs eventArgs) {
try {
string receivedEvent = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
_logger.LogDebug($"Received event: {receivedEvent}");
BusExcessMinified busExcess = BusExcessMinified.FromJson(receivedEvent);
double timeDiff = (DateTime.UtcNow - busExcess.Timestamp).TotalMinutes;
if (timeDiff < 2) {
string responseEvent = busExcess.ToJson();
_logger.LogDebug($"Sending message to BusExcess Hub: {responseEvent}");
await _busExcessHub.Clients.All.SendAsync("UpdateBuses", responseEvent);
}
_logger.LogDebug("Update checkpoint in the blob storage"); // So that the service receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
} catch (TaskCanceledException) {
_logger.LogInformation("The EventHub event processing was stopped");
} catch (Exception e) {
_logger.LogError($"Exception: {e}");
}
}
/* ProcessErrorHandler */
【问题讨论】:
标签: asp.net-core azure-eventhub