【问题标题】:TPL Dataflow Broadcastblock discards last messageTPL 数据流广播块丢弃最后一条消息
【发布时间】:2017-07-31 06:27:16
【问题描述】:

我有一个非常简单的问题。我需要一种方法来轻松地对需要一些时间的消息执行一些处理。在处理过程中,可能会输入新的请求,但可以丢弃除最后一个请求之外的所有请求。

所以我认为 TPL Broadcastblock 应该这样做,例如查看 StackExchange 上的文档和帖子。我创建了以下解决方案并为其添加了一些单元测试,但在单元测试中,有时最后一项未发送。

这不是我所期望的。如果它应该丢弃任何东西,我会说它应该丢弃第一项,因为如果它不能处理消息,它应该覆盖它的缓冲区 1。谁能看出来是什么?
任何帮助将不胜感激!

这是该块的代码:

/// <summary>
/// This block will take items and perform the specified action on it. Any incoming messages while the action is being performed
/// will be discarded.
/// </summary>
public class DiscardWhileBusyActionBlock<T> : ITargetBlock<T>
{
    private readonly BroadcastBlock<T> broadcastBlock;

    private readonly ActionBlock<T> actionBlock;

    /// <summary>
    /// Initializes a new instance of the <see cref="DiscardWhileBusyActionBlock{T}"/> class.
    /// Constructs a SyncFilterTarget{TInput}.
    /// </summary>
    /// <param name="actionToPerform">Thing to do.</param>
    public DiscardWhileBusyActionBlock(Action<T> actionToPerform)
    {
        if (actionToPerform == null)
        {
            throw new ArgumentNullException(nameof(actionToPerform));
        }

        this.broadcastBlock = new BroadcastBlock<T>(item => item);
        this.actionBlock = new ActionBlock<T>(actionToPerform, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
        this.broadcastBlock.LinkTo(this.actionBlock);
        this.broadcastBlock.Completion.ContinueWith(task => this.actionBlock.Complete());
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return ((ITargetBlock<T>)this.broadcastBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

    public void Complete()
    {
        this.broadcastBlock.Complete();
    }

    public void Fault(Exception exception)
    {
        ((ITargetBlock<T>)this.broadcastBlock).Fault(exception);
    }

    public Task Completion => this.actionBlock.Completion;
}

这是测试的代码:

[TestClass]
public class DiscardWhileBusyActionBlockTest
{
    [TestMethod]
    public void PostToConnectedBuffer_ActionNotBusy_MessageConsumed()
    {
        var actionPerformer = new ActionPerformer();

        var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
        var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);

        buffer.Post(1);

        DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);

        var expectedMessages = new[] { 1 };
        actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
    }

    [TestMethod]
    public void PostToConnectedBuffer_ActionBusy_MessagesConsumedWhenActionBecomesAvailable()
    {
        var actionPerformer = new ActionPerformer();

        var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
        var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);

        actionPerformer.SetBusy();

        // 1st message will set the actionperformer to busy, 2nd message should be sent when
        // it becomes available.
        buffer.Post(1);
        buffer.Post(2);

        actionPerformer.SetAvailable();

        DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);

        var expectedMessages = new[] { 1, 2 };
        actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
    }

    [TestMethod]
    public void PostToConnectedBuffer_ActionBusy_DiscardMessagesInBetweenAndProcessOnlyLastMessage()
    {
        var actionPerformer = new ActionPerformer();

        var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
        var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);

        actionPerformer.SetBusy();

        buffer.Post(1);
        buffer.Post(2);
        buffer.Post(3);
        buffer.Post(4);
        buffer.Post(5);

        actionPerformer.SetAvailable();

        DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);

        var expectedMessages = new[] { 1, 5 };
        actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
    }

    private static void WaitForCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        source.Complete();
        target.Completion.Wait(TimeSpan.FromSeconds(1));
    }

    private static BufferBlock<int> SetupBuffer(ITargetBlock<int> block)
    {
        var buffer = new BufferBlock<int>();
        buffer.LinkTo(block);
        buffer.Completion.ContinueWith(task => block.Complete());
        return buffer;
    }

    private class ActionPerformer
    {
        private readonly ManualResetEvent resetEvent = new ManualResetEvent(true);

        public List<int> LastReceivedMessage { get; } = new List<int>();

        public void Perform(int message)
        {
            this.resetEvent.WaitOne(TimeSpan.FromSeconds(3));
            this.LastReceivedMessage.Add(message);
        }

        public void SetBusy()
        {
            this.resetEvent.Reset();
        }

        public void SetAvailable()
        {
            this.resetEvent.Set();
        }
    }
}

【问题讨论】:

    标签: c# .net unit-testing tpl-dataflow


    【解决方案1】:

    当您将操作块的BoundedCapacity 分级为1 时,这意味着,如果它正在处理,并且队列中已经有项目,它将丢弃该消息,这将超出范围。所以基本上发生的事情是你的块做它的工作,当缓冲区已满时拒绝新消息。之后广播块完成,整个消息被发送给接收者,它调用Completion,完成整个管道。

    您需要检查返回的布尔值Post 是否有最后一条消息,或者更可能的是,将最后一条消息存储在某个变量中,以确保它将进入管道。看起来你最好不要使用BroadcastBlock,因为它的目的是提供消息的副本到链接块的数量,你自己写你的逻辑。也许您可以改用简单的BufferBlock

    更新OfferMessagemethod 也确实提供了有关所提供消息的信息。我认为您根本不需要缓冲区块,因为您必须处理管道的非默认逻辑。更容易拥有像_lastMessage 这样的字段,将最后一个值存储在其中,并在actionBlock 接受请求时将其擦除。您甚至可以完全删除数据流依赖,因为您所做的只是调用请求的方法。

    旁注:您可以在选项中设置link blocks with completion propagation

    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    this.broadcastBlock.LinkTo(this.actionBlock, linkOptions);
    

    这可以删除您使用potentially dangerous ContinueWith 的一些代码。如果您需要异步行为,您也可以await broadcastBlock.SendAsync() 而不是Post

    【讨论】:

    • 感谢您的回复(和旁注;)所以如果我理解正确,广播块正在覆盖它的缓冲区,但动作块实际上正在删除它们?有趣的!不过,在检查 Post 的返回值时,我不明白您的其他评论。我不会将消息发布到操作块,它是通过 OfferMessage 方法完成的。 Bufferblock 将不起作用,因为它缓冲所有传入的消息,而我希望只有一个缓冲区,每次在接收器忙时接收到消息时都会覆盖该缓冲区。你能澄清一下吗?
    • 更新答案。
    • 好的,谢谢你的解释。所以我得出结论,我不能使用内置的 TPL 逻辑来做到这一点,必须找到自己的解决方案。再次感谢!
    猜你喜欢
    • 2021-10-21
    • 2011-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多