【发布时间】:2017-04-20 14:07:34
【问题描述】:
我已经设置了一个系统来无限期地从队列中读取消息,然后使用 Rx 和 TPL DataFlow 将它们关闭。
由于某种原因,在收到数百条消息后,ActionBlock 停止运行挂起,我不知道为什么。 this.GetMessages() 继续发射,但 this.ProcessMessages 不再发射。
var source = Observable
.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
.SelectMany(x => this.GetMessages());
var actionBlock = new ActionBlock<List<QueueStream>>(
this.ProcessMessages,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
});
using (source.Subscribe(actionBlock.AsObserver()))
{
while (this.Run)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
actionBlock.Complete();
await actionBlock.Completion;
读者 - 请注意,这实际上会继续运行
private async Task<List<QueueStream>> GetMessages()
{
var messageList = new List<QueueStream>();
var taskList = new List<Task>();
// Add up to N items in our queue
for (var i = 0; i < 25; i++)
{
var task = this
.ReadAndParseQueue()
.ContinueWith(async queueStreamTask =>
{
var queueStream = await queueStreamTask;
if (queueStream != null)
{
messageList.Add(queueStream);
}
});
taskList.Add(task);
}
await Task.WhenAll(taskList);
return messageList;
}
作者 - 在几百条消息之后,它不再受到打击
private async Task ProcessMessages(List<QueueStream> streams)
{
var tasks = new List<Task>();
foreach (var queueStream in streams)
{
tasks.Add(this.ProcessMessage(queueStream));
}
await Task.WhenAll(tasks);
}
【问题讨论】:
-
您确实需要提供minimal reproducible example。我们可以复制粘贴并运行重现此问题的代码。
-
@Chris
ActionBlock的状态是什么,一旦它挂起,它很可能出现故障。在await完成之前,不会观察到从ProcessMessages中抛出的任何异常,如果始终设置this.run,就不会发生这种情况。
标签: .net async-await system.reactive tpl-dataflow