【问题标题】:how to update the progress bar from tasks running concurrently如何从同时运行的任务更新进度条
【发布时间】:2014-05-05 00:17:25
【问题描述】:

我正在尝试将并行任务绑定到包含 pprogressBars 的 listView。 我正在使用一个有限的调度程序,它只允许指定的最大并行度。 到目前为止,它大部分时间都可以正常工作,但偶尔会有两个任务更新 listView 中的相同进度条。 下面是我的代码。

知道如何防止两个任务更新 listView 中的同一个进度条吗? 或者如何从并发运行的任务中更新进度条?

public class MyClass
{
  public ObservableCollection<StatusInfo> StatusItems { get; set; }
  private Object thisLock = new Object();

  public int Process() //handled
  {
    StatusItems = new ObservableCollection<StatusInfo>();
    for (int i = 0; i < 4; i++)  // initialize progress bar collection
    {
      StatusInfo sInfo = new StatusInfo();
      sInfo.ThreadID = i;
      sInfo.Table = "No Table";
      sInfo.Percentage = 0;
      sInfo.Status = AppInfo.AVAILABLE;
      sInfo.Minimum = 0;
      sInfo.Maximum = 100;
      sInfo.Visibility = Visibility.Visible;
      StatusItems.Add(sInfo);
    }
    Parent.StatusItems = StatusItems; // assign to viewmodel

    int numberOfBackGroundThread = 4;
    LimitedTaskScheduler scheduler = new LimitedTaskScheduler(numberOfBackGroundThread);
    TaskFactory factory = new TaskFactory(scheduler);
    var ui = LimitedTaskScheduler.FromCurrentSynchronizationContext();

    Task[] tasks = new Task[rows.Count];
    for (int i = 0; i < rows.Count; i++)
    {
      ......
      tasks[i] = factory.StartNew<string>(() =>
      {
        int barIndex = -1;
        int threadID = Thread.CurrentThread.ManagedThreadId;
        cnt++;
        if (cnt > numberOfBackGroundThread - 1)
        {
          while (true)
          {                 
            for (int j = 0; j < numberOfBackGroundThread; j++)
            {
              lock (thisLock)
              {
                if (StatusItems[j].Status == "AVAILABLE" || StatusItems[j].Status == "DONE") 
                {
                  //Console.WriteLine(String.Format("Current table = {0}, Cuurent table type = {1}, BarIndex = {2}, ThreadID = {3}, Prev TableName = {4}, Prev TableType = {5}, Prev Status = {6} PrevThreadID = {7}", tabName, tabType, j.ToString(), Thread.CurrentThread.ManagedThreadId.ToString(), StatusItems[j].Table, StatusItems[j].TabType, StatusItems[j].Status, StatusItems[j].ThreadID));
                  // 0. Assign the current task to a display slot StatusItems[j] that is currently free.
                  // 1. We lock this piece of code to prevent another concurrent task from picking the same slot.
                  // 2. And we update the slot as "Occupied" immediately to avoid to prevent another concurrent task from picking the same slot.
                  // 3. We also add one extra slot to avoid concurrent tasks competing on the same sidplay slot and reduce wait on the lock.
                  // All of the above cannot completely avoid two thread using the same display slot. We may need a new way to 
                  // 4. Since multiple tasks may run on the same thread we may see the same thread ID appear on different display slots.
                  StatusItems[j].Status = "Occupied";
                  barIndex = j;
                  break; // break for loop
                }
              }
            }
            if (barIndex >= 0) { break; }  // break while loop
          }
        }
        else { barIndex = cnt; }

        StatusItems[barIndex].TabType = tabType;
        StatusItems[barIndex].ThreadID = threadID;

        int nStatus = IndividualProcess(barIndex);
        if (nStatus < 0) { AppInfo.JobStatus = "01"; }
        return result;
      });
    }
    var done = factory.ContinueWhenAll(tasks, completedTasks => { AppInfo.Finished = true; });
    done.ContinueWith(completedTasks => { int nStatus = PostProcess(); }, ui);
    return returnStatus;
  }

  private int IndividualProcess(int barIndex)
  {
     for (int i=0; i< 100; i++)  
     {
       perform work...
       SetProgressbar (i, StatusItems, barIndex, "in progress")
     }
     SetProgressbar (100, StatusItems, barIndex, "DONE")
  } 

  public void SetProgressbar(int pPercent, ObservableCollection<StatusInfo> pInfo, int pInfoIndex, string pStatus)
  {
    try // Increment percentage for COPY or nested PURGE 
    {
      if (Application.Current.Dispatcher.Thread != System.Threading.Thread.CurrentThread)
      {
        Application.Current.Dispatcher.BeginInvoke(new Action(() =>
        {
          ((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
          ((StatusInfo)pInfo[pInfoIndex]).Status = pStatus; 
          ((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
        }));
      }
      else // When the current thread is main UI thread. The label won't be updated until the EntityCopy() finishes.
      {
        ((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
        ((StatusInfo)pInfo[pInfoIndex]).Status = pStatus; 
        ((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
      }
    }
    catch { throw; }
  }      
}

public class LimitedTaskScheduler : TaskScheduler
{
  // Fields
  // Whether the current thread is processing work items.
  [ThreadStatic]
  private static bool _currentThreadIsProcessingItems;
  // The list of tasks to be executed. 
  private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 
  /// <summary>The maximum concurrency level allowed by this scheduler.</summary> 
  private readonly int _maxDegreeOfParallelism;
  /// <summary>Whether the scheduler is currently processing work items.</summary> 
  private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) 

  /// <summary> 
  /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the 
  /// specified degree of parallelism. 
  /// </summary> 
  /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
  public LimitedTaskScheduler(int maxDegreeOfParallelism)
  {
      if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
      _maxDegreeOfParallelism = maxDegreeOfParallelism;
  }

  /// <summary>Queues a task to the scheduler.</summary> 
  /// <param name="task">The task to be queued.</param>
  protected sealed override void QueueTask(Task task)
  {
      // Add the task to the list of tasks to be processed.  If there aren't enough 
      // delegates currently queued or running to process tasks, schedule another. 
    lock (_tasks)
      {
          _tasks.AddLast(task);
          if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
          {
              ++_delegatesQueuedOrRunning;
              NotifyThreadPoolOfPendingWork();
          }
      }
  }

  /// <summary> 
  /// Informs the ThreadPool that there's work to be executed for this scheduler. 
  /// </summary> 
  private void NotifyThreadPoolOfPendingWork()
  {
      ThreadPool.UnsafeQueueUserWorkItem(_ =>
      {
          // Note that the current thread is now processing work items. 
          // This is necessary to enable inlining of tasks into this thread.
          _currentThreadIsProcessingItems = true;
          try
          {
              // Process all available items in the queue. 
              while (true)
              {
                  Task item;
                  lock (_tasks)
                  {
                      // When there are no more items to be processed, 
                      // note that we're done processing, and get out. 
                      if (_tasks.Count == 0)
                      {
                          --_delegatesQueuedOrRunning;
                          break;
                      }

                      // Get the next item from the queue
                      item = _tasks.First.Value;
                      _tasks.RemoveFirst();
                  }

                  // Execute the task we pulled out of the queue 
                  base.TryExecuteTask(item);
              }
          }
          // We're done processing items on the current thread 
          finally { _currentThreadIsProcessingItems = false; }
      }, null);
  }

  /// <summary>Attempts to execute the specified task on the current thread.</summary> 
  /// <param name="task">The task to be executed.</param>
  /// <param name="taskWasPreviouslyQueued"></param>
  /// <returns>Whether the task could be executed on the current thread.</returns> 
  protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  {
      // If this thread isn't already processing a task, we don't support inlining 
      if (!_currentThreadIsProcessingItems) return false;

      // If the task was previously queued, remove it from the queue 
      if (taskWasPreviouslyQueued) TryDequeue(task);

      // Try to run the task. 
      return base.TryExecuteTask(task);
  }

  /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> 
  /// <param name="task">The task to be removed.</param>
  /// <returns>Whether the task could be found and removed.</returns> 
  protected sealed override bool TryDequeue(Task task)
  {
      lock (_tasks) return _tasks.Remove(task);
  }

  /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 
  public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

  /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> 
  /// <returns>An enumerable of the tasks currently scheduled.</returns> 
  protected sealed override IEnumerable<Task> GetScheduledTasks()
  {
      bool lockTaken = false;
      try
      {
          Monitor.TryEnter(_tasks, ref lockTaken);
          if (lockTaken) return _tasks.ToArray();
          else throw new NotSupportedException();
      }
      finally
      {
          if (lockTaken) Monitor.Exit(_tasks);
      }
  }
}

更新: 任务调度程序可能不相关。我把它放在这里是为了以防有人能找到我从未想过做或错过的新事物。

【问题讨论】:

  • 我不明白为什么任务调度程序与这个问题有关。无论任务任务调度程序如何,是否可以将其改写为“如何从同时运行的任务中更新进度条”?
  • 这看起来不像 MVVM 代码,事实上如果你采用 MVVM 你没有这个问题。
  • 您是否使用SemaphoreSlim得到最终解决方案?很想看到完整的例子。为什么只有 4 个进度条,而不是更多? 500 个任务总是 ? ——
  • 由于调度程序的限制,在任何给定时间只有 4 个任务同时运行?为什么 4 用于有限的调度程序'

标签: c# progress-bar task-parallel-library


【解决方案1】:

知道如何防止两个任务更新相同的进度条 列表视图?或者如何从正在运行的任务更新进度条 同时?

如果您有多个任务并行运行,并且您确实想要向用户显示每个单个任务的进度,则需要显示每个任务的独立进度条。

你会怎么做取决于 UI 结构。例如,如果您有一个列表视图,其中每个项目都是一个任务,您可以添加一个进度条作为每个列表视图项目的子窗口:

不过,这可能有点矫枉过正,通常有一个进度条来跟踪整体进度就足够了。即,如果您有 100 个任务,当所有 100 个任务都完成时,您的进度条将显示 100%。

请注意,任务 #50(例如)可能在任务 #45 之前完成,因此您不能使用任务编号来更新进度。要正确显示进度,您需要对已完成的任务进行计数,并将该计数器用作进度指示器。

更新以解决评论:

我在列表视图中有 4 个进度条和 500 个任务。在任何给定时间 由于有限,只有 4 个任务同时运行 调度器。我尝试在列表视图中免费分配一个进度条 status 到一个新的传入任务,然后设置的状态 任务完成时释放进度条,以便进度条可以 被另一个新的传入任务重用。那有意义吗?或者我是 走向死胡同?

我不确定这是一个明智的 UI 设计决定,但如果您希望这样,您可以使用 SemaphoreSlim 将进度条分配为有限资源。示例:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace TaskProgress
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        private async void Form1_Load(object sender, EventArgs e)
        {
            await DoWorkAsync();
        }

        const int MAX_PARALLEL = 4;

        readonly object _lock = new Object();

        readonly SemaphoreSlim _semaphore = 
            new SemaphoreSlim(initialCount: MAX_PARALLEL);

        HashSet<Task> _pendingTasks;

        Queue<ProgressBar> _availableProgressBars;

        // do all Task
        async Task DoWorkAsync()
        {
            _availableProgressBars = new Queue<ProgressBar>();
            _pendingTasks = new HashSet<Task>();

            var progressBars = new ProgressBar[] { 
                this.progressBar1, 
                this.progressBar2, 
                this.progressBar3, 
                this.progressBar4 };

            foreach (var item in progressBars)
                _availableProgressBars.Enqueue(item);

            for (int i = 0; i < 50; i++) // start 50 tasks
                QueueTaskAsync(DoTaskAsync());

            await Task.WhenAll(WithLock(() => _pendingTasks.ToArray()));
        }

        // do a sigle Task
        readonly Random _random = new Random(Environment.TickCount);

        async Task DoTaskAsync()
        {
            await _semaphore.WaitAsync();
            try
            {
                var progressBar = WithLock(() => _availableProgressBars.Dequeue());
                try
                {
                    progressBar.Maximum = 100;
                    progressBar.Value = 0;
                    IProgress<int> progress = 
                        new Progress<int>(value => progressBar.Value = value);

                    await Task.Run(() =>
                    {
                        // our simulated work takes no more than 10s
                        var sleepMs = _random.Next(10000) / 100;
                        for (int i = 0; i < 100; i++)
                        {
                            Thread.Sleep(sleepMs); // simulate work item
                            progress.Report(i);
                        }
                    });
                }
                finally
                {
                    WithLock(() => _availableProgressBars.Enqueue(progressBar));
                }
            }
            finally
            {
                _semaphore.Release();
            }
        }

        // Add/remove a task to the list of pending tasks
        async void QueueTaskAsync(Task task)
        {
            WithLock(() => _pendingTasks.Add(task));
            try
            {
                await task;
            }
            catch
            {
                if (!task.IsCanceled && !task.IsFaulted)
                    throw;
                return;
            }
            WithLock(() => _pendingTasks.Remove(task));
        }

        // execute func inside a lock
        T WithLock<T>(Func<T> func)
        {
            lock (_lock)
                return func();
        }

        // execute action inside a lock
        void WithLock(Action action)
        {
            lock (_lock)
                action();
        }

    }
}

这段代码没有使用自定义任务调度器,SemaphoreSlim 足以限制并发。还要注意保护锁(WithLock)在这里是多余的,因为除了Task.Run lambda 之外的所有东西都运行在 UI 线程上。但是,我决定保留锁,因为您的应用程序可能有不同的线程模型。在这种情况下,无论您在哪里访问ProgressBar 或其他 UI,都应该使用BeginInvoke 或喜欢在 UI 线程上进行操作。

另外,请查看"Async in 4.5: Enabling Progress and Cancellation in Async APIs" 了解有关Progress&lt;T&gt; 模式的更多详细信息。

【讨论】:

  • 嗨,Noseratio,感谢您的回答。在我的列表视图中,每个项目都是一个进度条。在每个任务开始时,我循环列表视图以找到一个“空闲”进度条并将进度条/列表视图索引分配给任务。它在 95% 的时间内都有效。但有时两个任务可能会使用相同的进度条。您或任何人可以帮助找出我的代码中的错误吗?
  • @Shawn,我不明白,你没有在任务和列表视图项之间有 1:1 的映射吗?
  • 没有。我在列表视图中有 4 个进度条和 500 个任务。由于调度程序的限制,在任何给定时间只有 4 个任务同时运行。我尝试在列表视图中为新的传入任务分配一个具有空闲状态的进度条,然后在任务完成时将进度条的状态设置为空闲,以便进度条可以被另一个新的传入任务重用。那有意义吗?还是我会走投无路?
  • @Shawn,我不确定这是一个明智的 UI 设计,但如果你想坚持下去,你最好用户 SemaphoreSlim 分配一个进度条作为有限的资源。我会在时间允许的情况下发布一个示例。
  • 列表视图中的 4 个进度条? 500 个任务?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-04-15
  • 2014-11-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-18
  • 1970-01-01
相关资源
最近更新 更多