【问题标题】:Apparent BufferBlock.Post/Receive/ReceiveAsync race/bug明显的 BufferBlock.Post/Receive/ReceiveAsync 竞赛/错误
【发布时间】:2012-04-21 13:05:08
【问题描述】:

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道...我并没有真正发挥 TplDataflow 的最大潜力。 ATM 我只是使用BufferBlock 作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。我看到一些奇怪的行为让我不知道如何 继续。

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

在上面的代码中(它是 2000 行分布式解决方案的一部分),Send 每 100 毫秒左右被定期调用一次。这意味着一个项目以大约每秒 10 次的速度被 Posted 到 messageQueue。这是经过验证的。但是,有时ReceiveAsync 似乎不会在超时内完成(即Post 不会导致ReceiveAsync 完成)并且TimeoutException 在30 秒后被提升。此时,messageQueue.Count 有数百个。这是出乎意料的。在较慢的发布速度(1 个帖子/秒)中也观察到此问题,并且通常发生在 1000 个项目通过BufferBlock 之前。

所以,为了解决这个问题,我使用了以下代码,它可以工作,但在接收时偶尔会导致 1s 延迟(由于出现上述错误)

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

对我来说,这看起来像是 TDF 中的竞争条件,但我无法弄清楚为什么在我以类似方式使用 BufferBlock 的其他地方不会发生这种情况。实验性地从 ReceiveAsync 更改为 Receive 并没有帮助。我没有检查过,但我想孤立地看,上面的代码可以完美运行。这是我在“TPL 数据流简介”tpldataflow.docx 中看到的一种模式。

我该怎么做才能查明真相?是否有任何指标可以帮助推断正在发生的事情?如果我无法创建可靠的测试用例,我还能提供哪些更多信息?

救命!

【问题讨论】:

  • 我认为您所做的事情或您的期望没有任何问题。我绝对认为您需要在 MSDN 论坛上保持活跃,而不是在这里。你已经引起了@StephenToub 的注意,他绝对是你想要调查的人。
  • 不。从来没有深究。我无法在一个独立的小型示例中重现该问题。因为我只使用 BufferBlock,所以我推出了自己的异步队列实现。我不需要更改任何其他代码......我只是重新实现了我正在使用的 BufferBlock 接口的部分。现在有效,这让我觉得有什么不对劲,但我无法证明这一点。 Grr.
  • @spendor 非常有趣,奇怪的是我在找到 BufferBlock 后放弃了自己的异步并发队列实现......现在我必须重新考虑。谢谢。
  • 有谁知道这是否仍然是一个问题?
  • @EyalPerry 我已经在许多其他项目中使用(并宣传过)数据流,此后再也没有遇到过这个问题。鉴于现在的产品与 6 年前相比已经成熟,如果这仍然是一个问题,我会感到非常惊讶。

标签: c# task-parallel-library async-await dataflow tpl-dataflow


【解决方案1】:

斯蒂芬似乎认为以下是解决方案

var m = await messageQueue.ReceiveAsync();

代替:

var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

你能确认还是否认这一点?

【讨论】:

  • 那行不通。我选择哪个 ReceiveAsync 重载并不重要,结果是一样的。请参阅上面的评论了解我的解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-11-02
  • 2012-07-12
  • 2013-02-17
  • 1970-01-01
  • 2014-06-06
  • 1970-01-01
相关资源
最近更新 更多