【发布时间】:2014-05-05 00:17:25
【问题描述】:
我正在尝试将并行任务绑定到包含 pprogressBars 的 listView。 我正在使用一个有限的调度程序,它只允许指定的最大并行度。 到目前为止,它大部分时间都可以正常工作,但偶尔会有两个任务更新 listView 中的相同进度条。 下面是我的代码。
知道如何防止两个任务更新 listView 中的同一个进度条吗? 或者如何从并发运行的任务中更新进度条?
public class MyClass
{
public ObservableCollection<StatusInfo> StatusItems { get; set; }
private Object thisLock = new Object();
public int Process() //handled
{
StatusItems = new ObservableCollection<StatusInfo>();
for (int i = 0; i < 4; i++) // initialize progress bar collection
{
StatusInfo sInfo = new StatusInfo();
sInfo.ThreadID = i;
sInfo.Table = "No Table";
sInfo.Percentage = 0;
sInfo.Status = AppInfo.AVAILABLE;
sInfo.Minimum = 0;
sInfo.Maximum = 100;
sInfo.Visibility = Visibility.Visible;
StatusItems.Add(sInfo);
}
Parent.StatusItems = StatusItems; // assign to viewmodel
int numberOfBackGroundThread = 4;
LimitedTaskScheduler scheduler = new LimitedTaskScheduler(numberOfBackGroundThread);
TaskFactory factory = new TaskFactory(scheduler);
var ui = LimitedTaskScheduler.FromCurrentSynchronizationContext();
Task[] tasks = new Task[rows.Count];
for (int i = 0; i < rows.Count; i++)
{
......
tasks[i] = factory.StartNew<string>(() =>
{
int barIndex = -1;
int threadID = Thread.CurrentThread.ManagedThreadId;
cnt++;
if (cnt > numberOfBackGroundThread - 1)
{
while (true)
{
for (int j = 0; j < numberOfBackGroundThread; j++)
{
lock (thisLock)
{
if (StatusItems[j].Status == "AVAILABLE" || StatusItems[j].Status == "DONE")
{
//Console.WriteLine(String.Format("Current table = {0}, Cuurent table type = {1}, BarIndex = {2}, ThreadID = {3}, Prev TableName = {4}, Prev TableType = {5}, Prev Status = {6} PrevThreadID = {7}", tabName, tabType, j.ToString(), Thread.CurrentThread.ManagedThreadId.ToString(), StatusItems[j].Table, StatusItems[j].TabType, StatusItems[j].Status, StatusItems[j].ThreadID));
// 0. Assign the current task to a display slot StatusItems[j] that is currently free.
// 1. We lock this piece of code to prevent another concurrent task from picking the same slot.
// 2. And we update the slot as "Occupied" immediately to avoid to prevent another concurrent task from picking the same slot.
// 3. We also add one extra slot to avoid concurrent tasks competing on the same sidplay slot and reduce wait on the lock.
// All of the above cannot completely avoid two thread using the same display slot. We may need a new way to
// 4. Since multiple tasks may run on the same thread we may see the same thread ID appear on different display slots.
StatusItems[j].Status = "Occupied";
barIndex = j;
break; // break for loop
}
}
}
if (barIndex >= 0) { break; } // break while loop
}
}
else { barIndex = cnt; }
StatusItems[barIndex].TabType = tabType;
StatusItems[barIndex].ThreadID = threadID;
int nStatus = IndividualProcess(barIndex);
if (nStatus < 0) { AppInfo.JobStatus = "01"; }
return result;
});
}
var done = factory.ContinueWhenAll(tasks, completedTasks => { AppInfo.Finished = true; });
done.ContinueWith(completedTasks => { int nStatus = PostProcess(); }, ui);
return returnStatus;
}
private int IndividualProcess(int barIndex)
{
for (int i=0; i< 100; i++)
{
perform work...
SetProgressbar (i, StatusItems, barIndex, "in progress")
}
SetProgressbar (100, StatusItems, barIndex, "DONE")
}
public void SetProgressbar(int pPercent, ObservableCollection<StatusInfo> pInfo, int pInfoIndex, string pStatus)
{
try // Increment percentage for COPY or nested PURGE
{
if (Application.Current.Dispatcher.Thread != System.Threading.Thread.CurrentThread)
{
Application.Current.Dispatcher.BeginInvoke(new Action(() =>
{
((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
((StatusInfo)pInfo[pInfoIndex]).Status = pStatus;
((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
}));
}
else // When the current thread is main UI thread. The label won't be updated until the EntityCopy() finishes.
{
((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
((StatusInfo)pInfo[pInfoIndex]).Status = pStatus;
((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
}
}
catch { throw; }
}
}
public class LimitedTaskScheduler : TaskScheduler
{
// Fields
// Whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// The list of tasks to be executed.
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
/// <summary>The maximum concurrency level allowed by this scheduler.</summary>
private readonly int _maxDegreeOfParallelism;
/// <summary>Whether the scheduler is currently processing work items.</summary>
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
/// <summary>
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
/// specified degree of parallelism.
/// </summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
public LimitedTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
/// <summary>
/// Informs the ThreadPool that there's work to be executed for this scheduler.
/// </summary>
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
/// <summary>Attempts to execute the specified task on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued"></param>
/// <returns>Whether the task could be executed on the current thread.</returns>
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued) TryDequeue(task);
// Try to run the task.
return base.TryExecuteTask(task);
}
/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
/// <param name="task">The task to be removed.</param>
/// <returns>Whether the task could be found and removed.</returns>
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
/// <returns>An enumerable of the tasks currently scheduled.</returns>
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
更新: 任务调度程序可能不相关。我把它放在这里是为了以防有人能找到我从未想过做或错过的新事物。
【问题讨论】:
-
我不明白为什么任务调度程序与这个问题有关。无论任务任务调度程序如何,是否可以将其改写为“如何从同时运行的任务中更新进度条”?
-
这看起来不像 MVVM 代码,事实上如果你采用 MVVM 你没有这个问题。
-
您是否使用
SemaphoreSlim得到最终解决方案?很想看到完整的例子。为什么只有 4 个进度条,而不是更多? 500 个任务总是 ? —— -
由于调度程序的限制,在任何给定时间只有 4 个任务同时运行?为什么 4 用于有限的调度程序'
标签: c# progress-bar task-parallel-library