【问题标题】:Limited concurrency level task scheduler (with task priority) handling wrapped tasks处理包装任务的有限并发级别任务调度程序(具有任务优先级)
【发布时间】:2012-11-02 20:42:49
【问题描述】:

我很难找到一个可以安排优先任务但也可以处理“打包”任务的任务调度程序。这类似于Task.Run 试图解决的问题,但您不能为Task.Run 指定任务调度程序。 我一直在使用Parallel Extensions Extras Samples 中的QueuedTaskScheduler 来解决任务优先级要求(这也是post 建议的)。

这是我的例子:

class Program
{
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1);
    private static TaskScheduler ts_priority1;
    private static TaskScheduler ts_priority2;
    static void Main(string[] args)
    {
        ts_priority1 = queueScheduler.ActivateNewQueue(1);
        ts_priority2 = queueScheduler.ActivateNewQueue(2);

        QueueValue(1, ts_priority2);
        QueueValue(2, ts_priority2);
        QueueValue(3, ts_priority2);
        QueueValue(4, ts_priority1);
        QueueValue(5, ts_priority1);
        QueueValue(6, ts_priority1);

        Console.ReadLine();           
    }

    private static Task QueueTask(Func<Task> f, TaskScheduler ts)
    {
        return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts);
    }

    private static Task QueueValue(int i, TaskScheduler ts)
    {
        return QueueTask(async () =>
        {
            Console.WriteLine("Start {0}", i);
            await Task.Delay(1000);
            Console.WriteLine("End {0}", i);
        }, ts);
    }
}

上面例子的典型输出是:

Start 4
Start 5
Start 6
Start 1
Start 2
Start 3
End 4
End 3
End 5
End 2
End 1
End 6

我想要的是:

Start 4
End 4
Start 5
End 5
Start 6
End 6
Start 1
End 1
Start 2
End 2
Start 3
End 3

编辑:

我想我正在寻找一个类似于QueuedTaskScheduler 的任务调度程序来解决这个问题。但欢迎提出任何其他建议。

【问题讨论】:

  • 嗯,你想要的是处理任务的优先级,而不是在并行模式下运行它们?你能不能只限制调度程序中的并发线程数?
  • @Kek new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 就是这样做的(将同时线程数限制为 1)

标签: c# .net task-parallel-library priority-queue async-await


【解决方案1】:

不幸的是,这不能用TaskScheduler 解决,因为它们总是在Task 级别工作,而async 方法几乎总是包含多个Tasks。

您应该将SemaphoreSlim 与优先调度程序结合使用。或者,您可以使用AsyncLock(我的AsyncEx library 中也包含它)。

class Program
{
  private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1);
  private static TaskScheduler ts_priority1;
  private static TaskScheduler ts_priority2;
  private static SemaphoreSlim semaphore = new SemaphoreSlim(1);
  static void Main(string[] args)
  {
    ts_priority1 = queueScheduler.ActivateNewQueue(1);
    ts_priority2 = queueScheduler.ActivateNewQueue(2);

    QueueValue(1, ts_priority2);
    QueueValue(2, ts_priority2);
    QueueValue(3, ts_priority2);
    QueueValue(4, ts_priority1);
    QueueValue(5, ts_priority1);
    QueueValue(6, ts_priority1);

    Console.ReadLine();           
  }

  private static Task QueueTask(Func<Task> f, TaskScheduler ts)
  {
    return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts).Unwrap();
  }

  private static Task QueueValue(int i, TaskScheduler ts)
  {
    return QueueTask(async () =>
    {
      await semaphore.WaitAsync();
      try
      {
        Console.WriteLine("Start {0}", i);
        await Task.Delay(1000);
        Console.WriteLine("End {0}", i);
      }
      finally
      {
        semaphore.Release();
      }
    }, ts);
  }
}

【讨论】:

  • 这看起来是一个有趣的解决方案。但是,我看到了一个问题。尽管该解决方案(起初)会产生正确的输出(如本题所示),但它会破坏已执行任务的优先级。调度程序将执行所有任务(以正确的优先级)直到await semaphore.WaitAsync(),但具有较高优先级的任务不会在较低优先级的任务之前从锁中释放。如果优先级较高的任务被安排在优先级较低的任务(仍在等待从锁中释放)之后,则尤其如此。
  • 在这种情况下,您将需要一个实际的基于优先级的锁,它不存在,因为 AFAIK 没有其他人需要它。您必须自己构建。
  • 我添加了自己的answer。请看看你的想法。
【解决方案2】:

我能找到的最佳解决方案是制作我自己的QueuedTaskScheduler 版本(原始版本可在Parallel Extensions Extras Samples 源代码中找到)。

我在QueuedTaskScheduler 的构造函数中添加了一个bool awaitWrappedTasks 参数。

public QueuedTaskScheduler(
        TaskScheduler targetScheduler,
        int maxConcurrencyLevel,
        bool awaitWrappedTasks = false)
{
    ...
    _awaitWrappedTasks = awaitWrappedTasks;
    ...
}

public QueuedTaskScheduler(
        int threadCount,
        string threadName = "",
        bool useForegroundThreads = false,
        ThreadPriority threadPriority = ThreadPriority.Normal,
        ApartmentState threadApartmentState = ApartmentState.MTA,
        int threadMaxStackSize = 0,
        Action threadInit = null,
        Action threadFinally = null,
        bool awaitWrappedTasks = false)
{
    ...
    _awaitWrappedTasks = awaitWrappedTasks;

    // code starting threads (removed here in example)
    ...
}

然后我将ProcessPrioritizedAndBatchedTasks()方法修改为async

private async void ProcessPrioritizedAndBatchedTasks()

然后我在执行计划任务的部分之后修改了代码:

private async void ProcessPrioritizedAndBatchedTasks()
{
    bool continueProcessing = true;
    while (!_disposeCancellation.IsCancellationRequested && continueProcessing)
    {
        try
        {
            // Note that we're processing tasks on this thread
            _taskProcessingThread.Value = true;

            // Until there are no more tasks to process
            while (!_disposeCancellation.IsCancellationRequested)
            {
                // Try to get the next task.  If there aren't any more, we're done.
                Task targetTask;
                lock (_nonthreadsafeTaskQueue)
                {
                    if (_nonthreadsafeTaskQueue.Count == 0) break;
                    targetTask = _nonthreadsafeTaskQueue.Dequeue();
                }

                // If the task is null, it's a placeholder for a task in the round-robin queues.
                // Find the next one that should be processed.
                QueuedTaskSchedulerQueue queueForTargetTask = null;
                if (targetTask == null)
                {
                    lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask);
                }

                // Now if we finally have a task, run it.  If the task
                // was associated with one of the round-robin schedulers, we need to use it
                // as a thunk to execute its task.
                if (targetTask != null)
                {
                    if (queueForTargetTask != null) queueForTargetTask.ExecuteTask(targetTask);
                    else TryExecuteTask(targetTask);

                    // ***** MODIFIED CODE START ****
                    if (_awaitWrappedTasks)
                    {
                        var targetTaskType = targetTask.GetType();
                        if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0]))
                        {
                            dynamic targetTaskDynamic = targetTask;
                            // Here we await the completion of the proxy task.
                            // We do not await the proxy task directly, because that would result in that await will throw the exception of the wrapped task (if one existed)
                            // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash)
                            await TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously);
                        }
                    }
                    // ***** MODIFIED CODE END ****
                }
            }
        }
        finally
        {
            // Now that we think we're done, verify that there really is
            // no more work to do.  If there's not, highlight
            // that we're now less parallel than we were a moment ago.
            lock (_nonthreadsafeTaskQueue)
            {
                if (_nonthreadsafeTaskQueue.Count == 0)
                {
                    _delegatesQueuedOrRunning--;
                    continueProcessing = false;
                    _taskProcessingThread.Value = false;
                }
            }
        }
    }
}

方法ThreadBasedDispatchLoop 的变化有点不同,因为我们不能使用async 关键字,否则我们将破坏在专用线程中执行计划任务的功能。所以这里是修改版ThreadBasedDispatchLoop

private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally)
{
    _taskProcessingThread.Value = true;
    if (threadInit != null) threadInit();
    try
    {
        // If the scheduler is disposed, the cancellation token will be set and
        // we'll receive an OperationCanceledException.  That OCE should not crash the process.
        try
        {
            // If a thread abort occurs, we'll try to reset it and continue running.
            while (true)
            {
                try
                {
                    // For each task queued to the scheduler, try to execute it.
                    foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
                    {
                        Task targetTask = task;
                        // If the task is not null, that means it was queued to this scheduler directly.
                        // Run it.
                        if (targetTask != null)
                        {
                            TryExecuteTask(targetTask);
                        }
                        // If the task is null, that means it's just a placeholder for a task
                        // queued to one of the subschedulers.  Find the next task based on
                        // priority and fairness and run it.
                        else
                        {
                            // Find the next task based on our ordering rules...                                    
                            QueuedTaskSchedulerQueue queueForTargetTask;
                            lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask);

                            // ... and if we found one, run it
                            if (targetTask != null) queueForTargetTask.ExecuteTask(targetTask);
                        }

                        if (_awaitWrappedTasks)
                        {
                            var targetTaskType = targetTask.GetType();
                            if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0]))
                            {
                                dynamic targetTaskDynamic = targetTask;
                                // Here we wait for the completion of the proxy task.
                                // We do not wait for the proxy task directly, because that would result in that Wait() will throw the exception of the wrapped task (if one existed)
                                // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash)
                                TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously).Wait();
                            }
                        }
                    }
                }
                catch (ThreadAbortException)
                {
                    // If we received a thread abort, and that thread abort was due to shutting down
                    // or unloading, let it pass through.  Otherwise, reset the abort so we can
                    // continue processing work items.
                    if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
                    {
                        Thread.ResetAbort();
                    }
                }
            }
        }
        catch (OperationCanceledException) { }
    }
    finally
    {
        // Run a cleanup routine if there was one
        if (threadFinally != null) threadFinally();
        _taskProcessingThread.Value = false;
    }
}

我已经对此进行了测试,它提供了所需的输出。这种技术也可以用于任何其他调度程序。例如。 LimitedConcurrencyLevelTaskSchedulerOrderedTaskScheduler

【讨论】:

  • 在调度器中等待任务会破坏异步 IO 的值。如果您无论如何都不需要异步 IO,则可以切换到同步任务主体。
  • +1 然后。我在这个问题中学到了很多东西。不完全相信这个解决方案比AsyncSemaphore 更可取,但我会考虑的。
  • 您正在从TaskScheduler 实现中执行async-void 方法?可怕,我想知道@StephenCleary 对此无话可说。
  • @springy76 这实际上工作得很好,但是当我发现 ActionBlock in the TPL DataFlow framework 的正确用法时,我已经放弃了任何明确的 TaskScheduler 实现。
【解决方案3】:

我认为实现这个目标是不可能的。一个核心问题似乎是TaskScheduler 只能用于运行代码。但是也有不运行代码的任务,比如IO任务或者定时器任务。我不认为TaskScheduler 基础设施可以用来安排这些。

从 TaskScheduler 的角度来看,它看起来像这样:

1. Select a registered task for execution
2. Execute its code on the CPU
3. Repeat

步骤 (2) 是同步的,这意味着要执行的 Task 必须作为步骤 (2) 的一部分开始和结束。这意味着这个 Task 不能做异步 IO,因为那将是非阻塞的。从这个意义上说,TaskScheduler 只支持阻塞代码。

我认为最好为你自己实现一个AsyncSemaphore 的版本,它会按优先顺序释放服务员并进行节流。您的异步方法可以以非阻塞方式等待该信号量。所有 CPU 工作都可以在默认线程池上运行,因此无需在自定义 TaskScheduler 内启动自定义线程。 IO 任务可以继续使用非阻塞 IO。

【讨论】:

  • 你在这里解释的内容我已经尝试过了,它的输出基本相同(与原始问题一样)。在您的建议中,firstPartTask 被安排在排队的任务调度程序上,但一旦到达第一个await 就完成了,调度程序只需执行队列中的下一个“第一部分”,即使之前的“内部任务”(第一个await) 之后的任务重置尚未完成。我只能认为这将由处理我正在寻找的这种情况的 调度程序 解决,并且无法通过调度程序之外的一些魔法来解决。
  • 我开始相信你是对的。我添加了一些想法和建议。请让我知道您的想法。
  • 感谢您的更新。您使用信号量锁的建议正是用户在以下answer 中建议的内容(请参阅我的 cmets)。您关于调度程序仅同步执行其任务的建议在某种程度上是正确的,但是如果调度程序在执行队列中的任何其他任务之前等待每个任务的“包装”任务怎么办。我认为这给了我一个想法......谢谢(如果我想出什么东西会告诉你的)。
  • 您可以创建一个自定义调度程序,该调度程序仅在包装任务和包装任务完成后才知道任务执行已完成。它需要进行运行时强制转换以查看排队的 Task 是否实际上是 Task&lt;Task&gt; 并在这种情况下添加延续等等。
  • 我添加了我自己的answer。请看看你的想法。
猜你喜欢
  • 1970-01-01
  • 2010-09-09
  • 2012-02-01
  • 1970-01-01
  • 1970-01-01
  • 2011-05-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多