【问题标题】:Timer + Producer-Consumer定时器+生产者-消费者
【发布时间】:2018-11-08 05:56:20
【问题描述】:

我是线程新手,我正在尝试学习不同的概念。

现在我正在使用定时器线程做一个生产者/消费者模式。问题是我不知道如何检查所有生产者和消费者线程是否完成了他们的进程,然后让 Timer 线程滴答一段时间,并为下一次滴答处理所有创建的生产者和消费者线程。

希望获得您的帮助和指导,了解如何为此方法创建变通方法。

这是我的示例代码:

public class WorkerThread
{
    public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);

    private Timer TimerThread { get; set; }

    public void ThreadTimer()
    {
        this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
    }

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();

        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => RunProducers(this.collection)));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => RunConsumers(this.collection)));
        }

        //TODO: Start all consumer threads...

        //TODO: Let Thread wait until all worker threads are done
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }



    public void RunProducers(BlockingQueue<Item> collection)
    {
        List<Item> lsItems = CreateListOfItems();

        foreach(var item in lsItems)
        {
            collection.Add(item);
        }

    }

    public void RunConsumers(BlockingQueue<Item> collection) 
    {
        while(true)
        {
            Item item = collection.Take();
            Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
            //Thread.Sleep(100);
        }
    }

    public List<Item> CreateListOfItems()
    {
        List<Item> lsItems = new List<Item>();
        for (int i = 0; i <= 9999; i++)
        {
            lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
        }
        return lsItems;
    }

}

BlockCollection 实现(由于我们的环境在 .Net 3.5 中,我们不能在更高版本上使用库)。

public class BlockingQueue<T> 
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int MaxSize;
    public bool closing;

    public BlockingQueue(int maxSize) {
        this.MaxSize = maxSize;
    }

    public void Add(T item) 
    {
        lock(queue)
        {
            while(queue.Count >= this.MaxSize)
            {
                Monitor.Wait(queue);
            }

            queue.Enqueue(item);
            if(queue.Count == 1)
            {
                Monitor.PulseAll(queue);
            }

        }
    }

    public T Take() 
    {
        lock(queue)
        {
            while(queue.Count == 0)
            {
                Monitor.Wait(queue);
            }

            T item = queue.Dequeue();
            if(queue.Count == MaxSize - 1)
            {
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }

    public void Close() 
    {
        lock (queue)
        {
            closing = true;
            Monitor.PulseAll(queue);
        }
    }

    public bool TryDequeue(out T value)
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                if (closing)
                {
                    value = default(T);
                    return false;
                }
                Monitor.Wait(queue);
            }
            value = queue.Dequeue();
            if (queue.Count == MaxSize - 1)
            {
                Monitor.PulseAll(queue);
            }
            return true;
        }
    }
}

【问题讨论】:

  • 你必须摆脱计时器,这不会很好地结束。照原样,它会不断创建更多永无止境的消费者线程,您必须等待太久才能看到它炸毁您的程序。在 sn-p 中,它只需要作为不稳定的生产者代码的创可贴。明智的做法是,只要在生成项目的任何实际代码中可用时,就将项目推入队列。那就需要用计时器来模拟吧。
  • 我刚刚在本地环境中更新了我的代码,同时进行基准测试,是的,它确实创建了很多消费者线程。现在我尝试只让 Timer Thread 创建生产者线程以在每个滴答声中对项目进行排队

标签: c# multithreading


【解决方案1】:

您可以检查所有工作线程的属性 IsAlive。这似乎不是很清晰的代码,但它确实有效:

public void StartMonitor(object state)
{
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();

        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => RunProducers(this.collection)));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => RunConsumers(this.collection)));
        }

       //TODO: Let Thread wait until all worker threads are done
        List<Thread> to_check = new List<Thread>(producers);
        to_check.AddRange(consumers);

        while(true)
        {
            Thread.Sleep(50);
            List<Thread> is_alive = new List<Thread>();
            foreach(Thread t in to_check)
                 if(t.IsAlive)
                     is_alive.Add(t);
            if(is_alive.Count == 0)
                break;
            to_check = is_alive;
        }
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

}

或者,也许更好的方法:

    private int[] _counter = new int[1];
    private int Counter
    {
        get 
        {
            lock (_counter) { return _counter[0]; }
        }
        set 
        {
            lock (_counter) { _counter[0] = value; }
        }

    }

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();
        Counter = 0;
        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
        }

        //TODO: Let Thread wait until all worker threads are done
        List<Thread> to_check = new List<Thread>(producers);
        to_check.AddRange(consumers);

        while (Counter > 0)
            Thread.Sleep(50);

        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }

为了避免使用 Sleep(),您可以使用 Barrier 类:

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();
        int producer_cnt = 1,
            consumer_cnt = 2;

        Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
        try
        {
            for (int i = 0; i < 1; i++)
            {
                producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
            }

            //TODO: Start all producer threads...

            for (int i = 0; i < 2; i++)
            {
                consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
            }

            //TODO: Let Thread wait until all worker threads are done
            List<Thread> to_check = new List<Thread>(producers);
            to_check.AddRange(consumers);
        }
        finally
        {
            b.SignalAndWait();
        }
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多