【发布时间】:2020-05-22 16:58:20
【问题描述】:
我的班级有ActionBlock:
private readonly ActionBlock<QueueMessage> block;
在构造函数中,我这样初始化它:
block = new ActionBlock<QueueMessage>(async s => await Process(s),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });
我的Process 方法实现为:
private async Task Process(QueueMessage message)
{
using var tx = stateManager.CreateTransaction();
var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
if (!result.HasValue)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
return;
}
try
{
await processorService.Process(message);
logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
await tx.CommitAsync();
}
catch (Exception e)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue. Error: {e}");
message.RetryCount++;
await retryQueue.EnqueueAsync(tx, message);
}
finally
{
await tx.CommitAsync();
}
}
当我收到想要处理的消息时,我会调用:
// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}
这是第一次工作,消息被完美处理。我收到的第二条消息(可能几秒钟后,可能一分钟后,取决于系统的负载),然后block.SendAsync 立即返回 false。 Process 中没有代码运行。当我在调试器下查看这个时,我注意到block.IsCompleted 是真的。但是,我从来没有打过block.Complete(); 之类的电话。
当我可能还有更多工作要做时,为什么我的块已完成?谢谢!
更新:我可能有线索。
我似乎遇到了这个断点(在 ActionBlock.cs 源代码中)
【问题讨论】:
-
管道中是否还有其他块,该块出现故障表明某处某处正在从块中抛出异常并使其出错。
-
@JSteward 是的,管道中没有其他块。但是,是的,我的行为是抛出一个异常(我的调试器还不足以告诉我),这显然使你的整个块无用。我已经修复了异常并添加了更多的 try/catch 代码,现在它可以按预期工作了。
标签: c# .net task-parallel-library tpl-dataflow