【问题标题】:Diffeent message types in Dataflow block数据流块中的不同消息类型
【发布时间】:2013-11-02 04:22:39
【问题描述】:

我需要设计自定义数据流块,它的作用类似于缓冲区,但会在超时后使项目可用。我会将传入的消息放入队列并启动计时器。当定时器被触发时,我会将队列中的一个项目移动到 BufferBlock 中,这将使其可供 Dataflow 消费者使用。

如果我将一个项目从内部队列移动到定时器处理程序的输出 BufferBlock 中,显然它不会是线程安全的,因为定时器处理程序可能与入队调用和损坏的队列发生冲突。 MSDN 中声称数据流是基于 Actor 思想的,它假设消息是单线程执行的,从而解决了同步问题。但是如果我引入一个计时器处理程序,这将打破这个假设。我可以在队列上使用老式锁,或者使用 CuncurrentQueue,但我很好奇是否有更多特殊的数据流方式来管理计时器,这样它就不会与数据流块的 Post() 调用冲突。

或者扩展这个问题,是否有一种优雅的方式让数据流块处理几种不同类型的消息并仍然提供线程安全模型?

【问题讨论】:

    标签: c# task-parallel-library actor tpl-dataflow


    【解决方案1】:

    我需要设计自定义数据流块,它的作用类似于缓冲区,但会在超时后使项目可用。

    那么,是这样的吗?

    public static TransformBlock<T, T> Delay<T>(Timespan delay)
    {
      return new TransformBlock<T, T>(async x =>
      {
        await Task.Delay(delay);
        return x;
      },
      new ExecutionDataflowBlockOptions
      {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
      });
    }
    

    如果您仍然认为需要自定义块,请务必阅读 Guide to Implementing Custom Dataflow Blocks,其中描述了您需要担心的所有锁。

    【讨论】:

    • 感谢指南链接,非常有用。
    【解决方案2】:

    来自the MSDN page you referenced

    由于运行时管理数据之间的依赖关系,您通常可以避免同步访问共享数据的要求。

    这意味着当您在代码中使用数据流块时,您通常不必担心同步问题,因为这些块会为您做到这一点

    但是当您编写自定义数据流块时,您确实需要自己处理同步。例如,假设您正在实现BufferBlock。在该块上调用Post() 必须以某种方式同步,因为两个源块可以同时调用Post()。没有人会为您处理该同步,因此您对 Post()* 的实现将需要使用锁或 ConcurrentQueue 或类似的东西。

    * 其实你没有实现Post(),你实现了OfferMessage()

    但是,如果我正确理解您的要求,您实际上可以利用 TDF 中已经存在的同步来实现您的块,而无需任何手动同步。您可以使用两个BufferBlocks、一个助手TaskDataflowBlock.Encapsulate() 来实现您的块:

    public static IPropagatorBlock<T, T> CreateDelayedBlock<T>(TimeSpan delay)
    {
        var source = new BufferBlock<T>();
        var target = new BufferBlock<T>();
    
        Task.Run(
            async () =>
            {
                while (await source.OutputAvailableAsync())
                {
                    T item;
                    if (source.TryReceive(out item))
                    {
                        await Task.Delay(delay);
                        await target.SendAsync(item);
                    }
                    else
                    {
                        // this shouldn't happen
                        // nobody else should be able to receive from source
                    }
                }
    
                // TODO: if source failed, fail target
                target.Complete();
            });
    
        return DataflowBlock.Encapsulate(source, target);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-17
      • 2012-06-03
      • 1970-01-01
      • 1970-01-01
      • 2017-12-12
      相关资源
      最近更新 更多