【问题标题】:Continuous data stream using DataFlow and RX stops processing使用 DataFlow 和 RX 的连续数据流停止处理
【发布时间】:2017-04-20 14:07:34
【问题描述】:

我已经设置了一个系统来无限期地从队列中读取消息,然后使用 Rx 和 TPL DataFlow 将它们关闭。

由于某种原因,在收到数百条消息后,ActionBlock 停止运行挂起,我不知道为什么。 this.GetMessages() 继续发射,但 this.ProcessMessages 不再发射。

var source = Observable
    .Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
    .SelectMany(x => this.GetMessages());

var actionBlock = new ActionBlock<List<QueueStream>>(
    this.ProcessMessages,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
    });

using (source.Subscribe(actionBlock.AsObserver()))
{
    while (this.Run)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

actionBlock.Complete();
await actionBlock.Completion;

读者 - 请注意,这实际上会继续运行

private async Task<List<QueueStream>> GetMessages()
{
    var messageList = new List<QueueStream>();
    var taskList = new List<Task>();

    // Add up to N items in our queue
    for (var i = 0; i < 25; i++)
    {
        var task = this
            .ReadAndParseQueue()
            .ContinueWith(async queueStreamTask =>
                {
                    var queueStream = await queueStreamTask;
                    if (queueStream != null)
                    {
                        messageList.Add(queueStream);
                    }
                });

        taskList.Add(task);
    }

    await Task.WhenAll(taskList);

    return messageList;
}

作者 - 在几百条消息之后,它不再受到打击

private async Task ProcessMessages(List<QueueStream> streams)
{
    var tasks = new List<Task>();
    foreach (var queueStream in streams)
    {
        tasks.Add(this.ProcessMessage(queueStream));
    }

    await Task.WhenAll(tasks);
}

【问题讨论】:

  • 您确实需要提供minimal reproducible example。我们可以复制粘贴并运行重现此问题的代码。
  • @Chris ActionBlock 的状态是什么,一旦它挂起,它很可能出现故障。在await 完成之前,不会观察到从ProcessMessages 中抛出的任何异常,如果始终设置this.run,就不会发生这种情况。

标签: .net async-await system.reactive tpl-dataflow


【解决方案1】:

您确定您的source 在这种情况下会继续运行吗?您的代码中有一个无限循环,但是,如果发生错误或未设置 this.Run,它将停止,然后您会看到这些行:

actionBlock.Complete();
await actionBlock.Completion;

这实际上阻止了actionBlock 接受新消息,因此永远不会调用ProcessMessages,因为消息只是被忽略了。

【讨论】:

    猜你喜欢
    • 2021-01-14
    • 1970-01-01
    • 2011-09-27
    • 2012-11-19
    • 2015-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多