【问题标题】:C# Wait until all threads terminated in ThreadPoolC# 等到所有线程在 ThreadPool 中终止
【发布时间】:2010-02-18 07:22:27
【问题描述】:

我有一个主线程,还有许多其他后台线程。

这些后台线程的主要用途是查询数据(来自网络的许多查询,这就是我创建多个线程的原因:以避免用户界面滞后)。

在主线程(用户界面)中导出数据时,我需要等到所有其他线程都完成。

我的代码是:

//...code to open save file dialog...

//this loop is to wait for all the threads finish their query
//QueryThread.threadCount is the count of the background threads
while (QueryThread.threadCount != 0)
{
    Thread.CurrentThread.Join(1000);
    Console.WriteLine(QueryThread.threadCount);
}

//...code to export data...

如果我将while循环注释掉,程序会顺利运行,但我导出的一些数据可能会显示一些“不需要的”材料,因为一些后台线程还没有完成它们的工作。

但是,上面的while循环是无限的,threadCount永远不会改变,这意味着在“Join()”方法期间,没有后台线程在运行。

为什么后台线程被阻塞,我该如何解决这个问题?

非常感谢!

【问题讨论】:

    标签: c# .net-2.0 multithreading


    【解决方案1】:

    您正在当前线程上调用Join 方法,这没有多大意义。你应该在你的工作线程上调用它:

    foreach (Thread thread in workerThreads)
    {
        thread.Join(1000);
    }
    

    不幸的是,这种方式违背了使用线程的目的,因为它会阻塞调用,直到所有其他线程都完成为止。

    BackgroundWorkerRunWorkerCompleted 事件可用于通知后台任务的完成并在表单上执行更新。

    【讨论】:

    • 我正在为那些后台线程使用ThreadPool 如何在上面做foreach方法呢?谢谢
    【解决方案2】:

    你的实现是错误的,你不应该使用 Join 作为你的线程之间的同步原语。

    你应该做的是实现producer-consumer pattern。这将允许您让线程等待执行工作,然后在您将其放入队列时激活执行该工作。

    但是,我要做的更改是从 UI 线程,不要将数据直接添加到生产者和消费者共享的队列中。相反,制作该数据的副本,然后将那个放入队列中。

    有关如何在 .NET 中实现生产者-消费者模式的更多信息,我建议您阅读标题为“How to: Synchronize a Producer and a Consumer Thread (C# Programming Guide)”的 MSDN 文档

    【讨论】:

    • 谢谢。我也对我的实现感到奇怪,但我是线程编程的新手。期待看到您的跟进。 =]
    • 很好的链接,但这似乎不是他的问题——我认为他的问题更多是关于聚合多个数据调用,然后在它们全部完成后将结果泵回 UI 线程?
    【解决方案3】:

    我想你想研究一下信号。为您的线程提供信号(ManualResetEvent/AutoResetEvent)。完成后在工作线程中设置()关联的信号句柄。在主线程中执行“WaitAll(signal1, signal2,signal3)”来等待您的工作线程完成。

    希望这会有所帮助,

    【讨论】:

    • 我试过了,但它说主线程是一个不支持 WaitAll 的 STAThread
    • 那么你必须迭代它们并在每个上迭代 WaitOne - 顺序无关紧要
    【解决方案4】:

    我忍不住自己尝试了一些。我确信还有改进的余地,但我认为它展示了如何处理一些多线程问题,包括原始问题。

    Form.cs

    namespace STAFormWithThreadPoolSync { internal delegate void WorkerEvent(WorkerEventInfo info); public partial class Form1 : Form { // We'll create a state object for each worker process List<WorkerState> workerStates = new List<WorkerState>(); public Form1() { InitializeComponent(); } // Executed in the main thread private void button1_Click(object sender, EventArgs e) { workersList.Items.Clear(); // Read the amount of thread we should start from the form int threadCountToUse = (int)ThreadCount.Value; WorkerEvent woEvent = new WorkerEvent(this.workerEventOccured); // Start up all threads for (int counter = 0; counter < threadCountToUse; ++counter) { // An object we can pass values into for the worker process to use. WorkerState workerState = new WorkerState(); workerState.OnStarted += woEvent; workerState.OnFinished += woEvent; // Register for the signal (and store its registered wait handle in the stateObj, which we also pass into the parameters!) workerState.registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(workerState.finishSignal, this.ItemHasFinished, workerState, -1, true); // Store the state object for later use. workerStates.Add(workerState); } WorkersProgress.Minimum = 0; WorkersProgress.Maximum = workerStates.Count; workerStates.ForEach(workerState => { // Fire of the worker thread (with the state object) ThreadPool.QueueUserWorkItem(this.ProcessItem, workerState); } ); button1.Enabled = false; CurrentResult.Value = 0; CurrentResultLabel.Text = "Current value"; ProgressTimer.Start(); } // event is run on the callers thread, so carefull accessing our controls on our form. internal void workerEventOccured(WorkerEventInfo info) { if (this.workersList.InvokeRequired) { WorkerEvent workerEvent = new WorkerEvent(workerEventOccured); this.Invoke(workerEvent, new object[] { info }); } else { switch (info.eventType) { case EventType.WorkerStarted: this.workersList.Items.Add(String.Format("Worker started on thread : {0}", info.workerState.threadId)); break; case EventType.WorkerEnded: this.workersList.Items.Add(String.Format("Worker finished on thread : {0}", info.workerState.threadId)); break; case EventType.AllWorkersFinished: this.workersList.Items.Add("ALL workers finished"); ProgressTimer.Stop(); button1.Enabled = true; CurrentResultLabel.Text = "Final value"; break; } } } // Executed in threadpool thread. private void ProcessItem(object state) { WorkerState workerState = state as WorkerState; int threadId = Thread.CurrentThread.ManagedThreadId; workerState.threadId = threadId.ToString(); WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.WorkerStarted; weInfo.workerState = workerState; workerState.Started(weInfo); // Simulate work for ((threadid / 2) seconds. Thread.Sleep((threadId * 500)); // Set the result in the state object to the threadId; workerState.result = threadId; // Signal that this thread is done. workerState.finishSignal.Set(); } // Executed in threadpool thread private void ItemHasFinished(object state, bool timedOut) { // get our state object WorkerState workerState = state as WorkerState; WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.WorkerEnded; weInfo.workerState = workerState; workerState.Finished(weInfo); } private void ProgressTimer_Tick(object sender, EventArgs e) { List<WorkerState> removeStates = new List<WorkerState>(); workerStates.ForEach(workerState => { if (workerState.finishSignal.WaitOne(0)) { CurrentResult.Value += workerState.result; removeStates.Add(workerState); } } ); removeStates.ForEach(workerState => { workerState.registeredWaitHandle.Unregister(workerState.finishSignal); workerStates.Remove(workerState); } ); WorkersProgress.Value = workerStates.Count; if (workerStates.Count == 0) { WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.AllWorkersFinished; weInfo.workerState = null; this.workerEventOccured(weInfo); } } } internal class WorkerState { internal string threadId = ""; internal int result = 0; internal RegisteredWaitHandle registeredWaitHandle = null; internal AutoResetEvent finishSignal = new AutoResetEvent(false); internal event WorkerEvent OnStarted = new WorkerEvent( (info) => {}); internal event WorkerEvent OnFinished = new WorkerEvent((info) => { }); internal void Started(WorkerEventInfo info) { OnStarted(info); } internal void Finished(WorkerEventInfo info) { OnFinished(info); this.finishSignal.Set(); } } internal enum EventType { WorkerStarted, WorkerEnded, AllWorkersFinished } internal class WorkerEventInfo { internal EventType eventType; internal WorkerState workerState; } }

    Form.Designer.cs

    namespace STAFormWithThreadPoolSync { partial class Form1 { /// /// Required designer variable. /// private System.ComponentModel.IContainer components = null; /// /// Clean up any resources being used. /// /// true if managed resources should be disposed; otherwise, false. protected override void Dispose(bool disposing) { if (disposing && (components != null)) { components.Dispose(); } base.Dispose(disposing); } #region Windows Form Designer generated code /// /// Required method for Designer support - do not modify /// the contents of this method with the code editor. /// private void InitializeComponent() { this.components = new System.ComponentModel.Container(); this.button1 = new System.Windows.Forms.Button(); this.ThreadCount = new System.Windows.Forms.NumericUpDown(); this.workersList = new System.Windows.Forms.ListView(); this.WorkerProcessColumn = new System.Windows.Forms.ColumnHeader(); this.ProgressTimer = new System.Windows.Forms.Timer(this.components); this.WorkersProgress = new System.Windows.Forms.ProgressBar(); this.CurrentResultLabel = new System.Windows.Forms.Label(); this.CurrentResult = new System.Windows.Forms.NumericUpDown(); this.label2 = new System.Windows.Forms.Label(); ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).BeginInit(); ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).BeginInit(); this.SuspendLayout(); // // button1 // this.button1.Location = new System.Drawing.Point(212, 19); this.button1.Name = "button1"; this.button1.Size = new System.Drawing.Size(93, 23); this.button1.TabIndex = 0; this.button1.Text = "Start threads"; this.button1.UseVisualStyleBackColor = true; this.button1.Click += new System.EventHandler(this.button1_Click); // // ThreadCount // this.ThreadCount.Location = new System.Drawing.Point(23, 21); this.ThreadCount.Minimum = new decimal(new int[] { 2, 0, 0, 0}); this.ThreadCount.Name = "ThreadCount"; this.ThreadCount.Size = new System.Drawing.Size(183, 20); this.ThreadCount.TabIndex = 1; this.ThreadCount.Value = new decimal(new int[] { 4, 0, 0, 0}); // // workersList // this.workersList.Columns.AddRange(new System.Windows.Forms.ColumnHeader[] { this.WorkerProcessColumn}); this.workersList.Location = new System.Drawing.Point(23, 80); this.workersList.Name = "workersList"; this.workersList.Size = new System.Drawing.Size(486, 255); this.workersList.TabIndex = 3; this.workersList.UseCompatibleStateImageBehavior = false; this.workersList.View = System.Windows.Forms.View.Details; // // WorkerProcessColumn // this.WorkerProcessColumn.Text = "Worker process"; this.WorkerProcessColumn.Width = 482; // // ProgressTimer // this.ProgressTimer.Interval = 200; this.ProgressTimer.Tick += new System.EventHandler(this.ProgressTimer_Tick); // // WorkersProgress // this.WorkersProgress.Location = new System.Drawing.Point(112, 341); this.WorkersProgress.Name = "WorkersProgress"; this.WorkersProgress.Size = new System.Drawing.Size(397, 24); this.WorkersProgress.TabIndex = 4; // // CurrentResultLabel // this.CurrentResultLabel.AutoSize = true; this.CurrentResultLabel.Location = new System.Drawing.Point(578, 266); this.CurrentResultLabel.Name = "CurrentResultLabel"; this.CurrentResultLabel.Size = new System.Drawing.Size(74, 13); this.CurrentResultLabel.TabIndex = 5; this.CurrentResultLabel.Text = "Current Result"; // // CurrentResult // this.CurrentResult.Location = new System.Drawing.Point(581, 282); this.CurrentResult.Maximum = new decimal(new int[] { -1593835520, 466537709, 54210, 0}); this.CurrentResult.Name = "CurrentResult"; this.CurrentResult.ReadOnly = true; this.CurrentResult.Size = new System.Drawing.Size(169, 20); this.CurrentResult.TabIndex = 6; // // label2 // this.label2.AutoSize = true; this.label2.Location = new System.Drawing.Point(25, 352); this.label2.Name = "label2"; this.label2.Size = new System.Drawing.Size(81, 13); this.label2.TabIndex = 7; this.label2.Text = "processing load"; // // Form1 // this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F); this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font; this.ClientSize = new System.Drawing.Size(762, 377); this.Controls.Add(this.label2); this.Controls.Add(this.CurrentResult); this.Controls.Add(this.CurrentResultLabel); this.Controls.Add(this.WorkersProgress); this.Controls.Add(this.workersList); this.Controls.Add(this.ThreadCount); this.Controls.Add(this.button1); this.Name = "Form1"; this.Text = "Form1"; ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).EndInit(); ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).EndInit(); this.ResumeLayout(false); this.PerformLayout(); } #endregion private System.Windows.Forms.Button button1; private System.Windows.Forms.NumericUpDown ThreadCount; private System.Windows.Forms.ListView workersList; private System.Windows.Forms.ColumnHeader WorkerProcessColumn; private System.Windows.Forms.Timer ProgressTimer; private System.Windows.Forms.ProgressBar WorkersProgress; private System.Windows.Forms.Label CurrentResultLabel; private System.Windows.Forms.NumericUpDown CurrentResult; private System.Windows.Forms.Label label2; } }

    希望这会有所帮助,

    【讨论】:

    • 谢谢!你肯定表现出你对编程的热情=] 但是代码中缺少某些东西:1. for loop in button1_Click 2. ForEach in the ProgressTimer_Tick
    • 修复了 button1-click 不能正确显示源的问题。 ProgressTimer 没有发现任何问题,它使用的是 lambda。
    【解决方案5】:

    我通过改变生产者-消费者模型的方法解决了这个问题。

    谢谢大家。 请看这个link(由上面的casperOne提供),但注意不要跟随microsoft的实现......

    转到here 会给你一个更好的答案。

    当然我做了一些改变,队列的类型在我的例子中是 Delegate。

    public static class QueryThread
    {
        private static SyncEvents _syncEvents = new SyncEvents();
        private static Queue<Delegate> _queryQueue = new Queue<Delegate>();
    
        static Producer queryProducer;
        static Consumer queryConsumer;
    
        public static void init()
        {
            queryProducer = new Producer(_queryQueue, _syncEvents);
            queryConsumer = new Consumer(_queryQueue, _syncEvents);
    
            Thread producerThread = new Thread(queryProducer.ThreadRun);
            Thread consumerThread = new Thread(queryConsumer.ThreadRun);
    
            producerThread.IsBackground = true;
            consumerThread.IsBackground = true;
    
            producerThread.Start();
            consumerThread.Start();
        }
    
        public static void Enqueue(Delegate item)
        {
            queryQueue.Enqueue(item);
        }
    }
    

    当主线程需要查询时,将一个委托入队,该委托指向通过调用 Enqueue(Delegate item) 进行查询的函数。这会将委托添加到生产者的“私有”队列中。

    生产者会在合适的场合将自己队列中的项目加入到共享队列中(比如在msdn例子中生成随机数并放入共享队列中)。

    消费者将代表出列并运行它们。

    感谢大家的帮助。 =]

    【讨论】:

      猜你喜欢
      • 2011-01-31
      • 1970-01-01
      • 1970-01-01
      • 2023-02-21
      • 1970-01-01
      • 2023-03-23
      • 1970-01-01
      • 2016-05-29
      • 2020-04-21
      相关资源
      最近更新 更多