【问题标题】:Customizing ActionBlock<T>自定义 ActionBlock<T>
【发布时间】:2012-12-12 12:19:34
【问题描述】:

我想实现一个优先的ActionBlock&lt;T&gt;。这样我就可以通过使用Predicate&lt;T&gt; 有条件地优先考虑某些TInput 项目。
我读过 Parallel Extensions Extras SamplesGuide to Implementing Custom TPL Dataflow Blocks
但仍然不知道我该如何实现这个场景。
- - - - - - - - - - - - - - 编辑 - - - - - - - - - - - ------
有一些任务,其中 5 个可以同时运行。当用户按下按钮时,一些(取决于谓词函数)任务应该以最高优先级运行。
事实上我写了这段代码

TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...     
if (predicate(customObject))
    actionBlockHigh.Post(customObject);
else
    actionBlockLow.Post(customObject);

但似乎优先级根本没有生效。
---------------------------------------- 编辑 ------------------
我发现当我使用这行代码时:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow });

导致应用程序正确观察Tasks的优先级但一次只能执行一个任务,同时使用流程中显示的第一个代码块,导致应用程序同时运行5个任务但优先级顺序不正确。

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow });

更新:
要 svick 的坦克,我应该为 taskSchedulerLow 指定 MaxMessagesPerTask

【问题讨论】:

  • 什么决定了优先级?它与T 完全无关吗?还是优先级是T 的固有/派生属性?
  • 您可以创建使用 ConcurrentPriorityQueue 的自定义缓冲区块,也可以创建自定义异步转换块。这两种选择都不是微不足道的。也同意@casperOne,在你的情况下优先级是什么意思?

标签: c# multithreading task-parallel-library .net-4.5 tpl-dataflow


【解决方案1】:

您的问题没有包含很多细节,因此以下只是您可能需要的猜测。

我认为最简单的方法是让两个ActionBlocks 在QueuedTaskScheduler from ParallelExtensionsExtras 上以不同的优先级运行。您将使用谓词链接到高优先级的链接,然后链接到低优先级的链接。此外,为了确保高优先级的Tasks 不会等待,请设置低优先级块的MaxMessagesPerTask

在代码中,它看起来像:

static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate)
{
    var buffer = new BufferBlock<T>();

    var scheduler = new QueuedTaskScheduler(1);

    var highPriorityScheduler = scheduler.ActivateNewQueue(0);
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1);

    var highPriorityBlock = new ActionBlock<T>(
        action, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = highPriorityScheduler
        });
    var lowPriorityBlock = new ActionBlock<T>(
        action, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = lowPriorityScheduler,
            MaxMessagesPerTask = 1
        });

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate);
    buffer.LinkTo(lowPriorityBlock);

    return buffer;
}

这只是您可以做什么的草图,例如,返回的块的Completion 行为不正确。

【讨论】:

  • 在您的代码中,您没有为低优先级块指定MaxMessagesPerTask。就像我说的那样,这样做非常重要。
猜你喜欢
  • 2016-05-25
  • 1970-01-01
  • 2010-10-03
  • 2018-10-30
  • 2011-07-13
  • 2021-07-03
  • 2019-01-30
  • 2011-01-09
  • 2012-04-20
相关资源
最近更新 更多