【问题标题】:How do I implement my own advanced Producer/Consumer scenario?如何实现我自己的高级生产者/消费者场景?
【发布时间】:2011-05-07 19:10:11
【问题描述】:

注意:
我对我的问题进行了彻底的修改。您可以通过更改历史记录查看原始问题。


我需要一个“强大”的队列,它提供以下功能:

  • 我对一组对象有一定的范围。这意味着 A 组B 组、...将有自己的队列
  • 我正在组范围线程中填充队列线程 A(生产者)
  • 我正在读取组范围线程中的队列线程 B(消费者)

所以我会有以下场景:

  1. 队列中没有任何项目(因为作业是使用空的“目标组”调用的):线程 B 应该退出循环
  2. 队列中目前没有项目,因为线程 A 正在处理要排队的项目:线程 B 应该等待
  3. 队列中有项目:线程 B 应该能够出列并处理项目
  4. 队列中没有项目,因为 线程 A 没有更多要排队的项目:线程 B 应该退出循环

现在我想出了以下实现:

public class MightyQueue<T>
  where T : class
{
    private readonly Queue<T> _queue = new Queue<T>();

    private bool? _runable;
    private volatile bool _completed;

    public bool Runable
    {
        get
        {
            while (!this._runable.HasValue)
            {
                Thread.Sleep(100);
            }
            return this._runable ?? false;
        }
        set
        {
            this._runable = value;
        }
    }

    public void Enqueue(T item)
    {
        if (item == null)
        {
            throw new ArgumentNullException("item");
        }

        this._queue.Enqueue(item);
    }

    public void CompleteAdding()
    {
        this._completed = true;
    }

    public bool TryDequeue(out T item)
    {
        if (!this.Runable)
        {
            item = null;
            return false;
        }
        while (this._queue.Count == 0)
        {
            if (this._completed)
            {
                item = null;
                return false;
            }
            Thread.Sleep(100);
        }
        item = this._queue.Dequeue();
        return true;
    }
}

然后会被使用

制片人

if (anythingToWorkOn)
{
    myFooMightyQueueInstance.Runable = false;
}
else
{
    myFooMightyQueueInstance.Runable = true;
    while (condition)
    {
        myFooMightyQueueInstance.Enqueue(item);
    }
    myFooMightyQueueInstance.CompleteAdding();
}

消费者

if (!myFooMightyQueueInstance.Runable)
{
    return;
}

T item;
while (myFooMightyQueueInstance.TryDequeue(out item))
{
    //work with item
}

但我相信,这种方法是错误的,因为我在那里使用了一些 Thread.Sleep()-stuff(不能有一些 waitHandle 或其他东西吗?)...我不是关于算法本身任何一个 ... 谁能帮帮我?

【问题讨论】:

  • 那么,您的问题是什么,到目前为止您尝试过什么?听起来像是一个简单的 Two_Thread 结构,中间有一个队列,需要从线程 B 查询。
  • @marcelo: just wondering how to fully solve this problem 应该清楚
  • 绝对没有理由在这种情况下使用线程,因为线程 B 只会等待线程 A,就像 90% 的时间一样 + 考虑到上下文切换开销和线程同步开销,结果可能很容易多线程版本实际上会比单线程版本慢
  • @psicho:我的问题不是性能,而是时间问题。如果您需要在例如开始邮寄。 08h00,您需要未知的时间来准备尸体(因为您不确切知道一个尸体的数量或所需时间),您始终可以选择开始,例如。提前5分钟准备。所以这就是多线程的原因!
  • @Andreas:可以将 SMTP 服务器配置为接受队列消息,允许您将线程卸载到邮件服务器。你在这里实现的东西已经解决了。

标签: c# multithreading .net-3.5 queue


【解决方案1】:

如果您有 .Net 4.0,请使用 BlockingCollection。它通过CompleteAdding 方法为您处理所有混乱,包括最后一点。

如果您有较早的 .Net,则升级(即,我懒得解释如何实现已经为您完成的事情。)

编辑:我认为您的问题根本不需要线程。只需提前做好所有的电子邮件,然后睡到指定的时间。

【讨论】:

  • .net 4.0 没有选项,这就是我添加标签.net 3.5 的原因...抱歉:(
  • 此外,此解决方案也缺少issue 1。如何处理将获得任何项目的队列...
  • 谁……坏主意!如果我这样做,发送时个性化可能不再有效!我将不得不缩短从个性化到发送的时间跨度!
  • 更清楚地说明问题。您的问题忽略了您似乎在少量计量的重要细节。
  • 我应该这样做,jep,因为每个人都试图为我创造一个新环境 :) 哈哈 ...稍后会这样做!
【解决方案2】:

您应该从一个通用的生产者-消费者队列开始并使用它。在队列中实现这一点并不是一个好主意,因为这会阻止您使用信号量来向线程发出信号(或者,您可以在队列中使用公共信号量,但这是一个真的坏主意)。

一旦线程 A 将单个工作项排入队列,它必须发出信号量来通知线程 B。当线程 B 完成处理所有项目时,它应该发出信号量来通知其他人它已经完成。您的主线程应该等待第二个信号量知道一切都已完成。

[编辑]

首先,你有一个生产者和一个消费者:

public interface IProducer<T> : IStoppable
{
    /// <summary>
    /// Notifies clients when a new item is produced.
    /// </summary>
    event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}

public interface IConsumer<T> : IStoppable
{
    /// <summary>
    /// Performs processing of the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    void ConsumeItem(T item);
}

public interface IStoppable
{
    void Stop();
}

因此,在您的情况下,创建邮件的类需要触发 ItemProduced 事件,发送它的类需要实现 ConsumeItem

然后你将这两个实例传递给Worker的一个实例:

public class Worker<T>
{
    private readonly Object _lock = new Object();
    private readonly Queue<T> _queuedItems = new Queue<T>();
    private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
    private readonly IProducer<T> _producer;
    private readonly IConsumer<T> _consumer;
    private volatile bool _ending = false;
    private Thread _workerThread;

    public Worker(IProducer<T> producer, IConsumer<T> consumer)
    {
        _producer = producer;
        _consumer = consumer;
    }

    public void Start(ThreadPriority priority)
    {
        _producer.ItemProduced += Producer_ItemProduced;
        _ending = false;

        // start a new thread
        _workerThread = new Thread(new ThreadStart(WorkerLoop));
        _workerThread.IsBackground = true;
        _workerThread.Priority = priority;
        _workerThread.Start();
    } 

    public void Stop()
    {
        _producer.ItemProduced -= Producer_ItemProduced;
        _ending = true;

        // signal the consumer, in case it is idle
        _itemReadyEvt.Set();
        _workerThread.Join();
    }

    private void Producer_ItemProduced
         (object sender, ProducedItemEventArgs<T> e)
    {
        lock (_lock) { _queuedItems.Enqueue(e.Item); }

        // notify consumer thread
        _itemReadyEvt.Set();
    }

    private void WorkerLoop()
    {
        while (!_ending)
        {
            _itemReadyEvt.WaitOne(-1, false);

            T singleItem = default(T);
            lock (_lock)
            {
               if (_queuedItems.Count > 0)
               {
                   singleItem = _queuedItems.Dequeue();
               }
            }


            while (singleItem != null)
            {
                try
                {
                    _consumer.ConsumeItem(singleItem);
                }
                catch (Exception ex)
                {
                    // handle exception, fire an event
                    // or something. Otherwise this
                    // worker thread will die and you
                    // will have no idea what happened
                }

                lock (_lock)
                {
                    if (_queuedItems.Count > 0)
                    {
                        singleItem = _queuedItems.Dequeue();
                    }
                }
            }

         }

    } // WorkerLoop

} // Worker

这是大体思路,可能还需要一些额外的调整。

要使用它,你需要让你的类实现这两个接口:

IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();

Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();

// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();

// following line will block this (calling) thread
// until all items are consumed
worker.Stop();

这样做的好处在于:

  • 您可以拥有任意数量的工人
  • 多个工人可以接受来自同一生产者的物品
  • 多个工作人员可以将项目分派给同一个消费者(尽管这意味着您需要考虑消费者是以线程安全的方式实现的)

【讨论】:

  • 如何处理issue 1:不会有任何项目被添加到队列中?
  • @Andread Niedermair:在这种情况下你期望发生什么并不完全清楚。使用简单的生产者/消费者,线程 B 永远不会被线程 A 通知。我稍后会更新答案。
  • 好吧...对于每个大目标组,存在一个 Thread B 和一个 Thread A ...这不是全球邮寄/个性化 -线程,因为这些线程/作业需要更多地了解目标组(这个范围场景的真正原因)。查看我的更新:)
  • @Andreas:谢谢,如果你能使用它,我很高兴。只需检查 Worker.Stop() 方法中的第一行,分离事件处理程序时有一个错字(在 Stop 的开头必须有一个-= 而不是+=),我刚刚注意到它并修复它。
【解决方案3】:

我写了一个简单的例子,对我来说很好,应该适合你的场景。消费者是否正在运行取决于运行变量的设置方式,但您可以轻松地将其修改为更复杂的条件,例如“如果不存在邮件但有人说我应该等待更多”。

public class MailSystem
{
    private readonly Queue<Mail> mailQueue = new Queue<Mail>();
    private bool running;
    private Thread consumerThread;

    public static void Main(string[] args)
    {
        MailSystem mailSystem = new MailSystem();
        mailSystem.StartSystem();
    }

    public void StartSystem()
    {
        // init consumer
        running = true;
        consumerThread = new Thread(ProcessMails);
        consumerThread.Start();
        // add some mails
        mailQueue.Enqueue(new Mail("Mail 1"));
        mailQueue.Enqueue(new Mail("Mail 2"));
        mailQueue.Enqueue(new Mail("Mail 3"));
        mailQueue.Enqueue(new Mail("Mail 4"));
        Console.WriteLine("producer finished, hit enter to stop consumer");
        // wait for user interaction
        Console.ReadLine();
        // exit the consumer
        running = false;
        Console.WriteLine("exited");
    }

    private void ProcessMails()
    {
        while (running)
        {
            if (mailQueue.Count > 0)
            {
                Mail mail = mailQueue.Dequeue();
                Console.WriteLine(mail.Text);
                Thread.Sleep(2000);
            }
        }
    }
}

internal class Mail
{
    public string Text { get; set; }

    public Mail(string text)
    {
        Text = text;
    }
}

【讨论】:

  • 这个解决方案缺少running 的同步,它需要一个Thread.Sleep(),这不是很好......但是经过一些修改它就可以适用了!
  • Tread.Sleep() 仅用于演示目的,您可以安全地删除它;)否则您不会看到,邮件会定期检查,您可以通过按键中断它。而且我没有使用锁或其他同步机制,因为我认为你的目的不需要。
【解决方案4】:

你想要的可以用条件变量来完成。我会编写一个伪代码示例,应该不会太难实现。

一个线程具有以下内容:

while(run)
  compose message
  conditionvariable.lock()
  add message to queue
  conditionvariable.notifyOne()
  conditionvariable.release()

虽然其他线程有一些类似的东西

while(threadsafe_do_run())
  while threadsafe_queue_empty()
       conditionvariable.wait()
  msg = queue.pop()
  if msg == "die"
      set_run(false)
  conditionvariable.release()
  send msg

因此,如果您没有收到任何消息,请推送死亡消息。处理完所有消息后也是如此。

do_run() 和 queue_empty() 应该线程安全地检查它们的东西,使用适当的锁。

wait() 在调用 notifyOne() 时返回,然后队列有消息要发送。在大多数框架中,条件变量已经拥有锁,您可能需要自己在 .NET 中添加锁语句。

【讨论】:

  • 您在调用queue_empty之前忘记锁定。
  • 如何处理issue 1:永远不会将任何项目添加到队列中?
  • @Andreas:您的第一点仅说明了一种情况,但未能解释发生这种情况时您想要做什么。如果您只想让线程退出,通常的方法是毒化队列(即添加“死亡”消息)。在这方面,没有消息的特殊情况并没有什么特别之处。
  • 更新了我的问题 .. 感谢您的提示 - 我认为这很明显 :) 并没有那么错,因为您几乎已经猜到了 :)
  • Marcelo 不,我没有,这就是为什么我添加了关于 qeueue_empty 是 thread_safe 的评论,但是是的,我可以写得更清楚。我将添加模具条件,我确实忘记了。
【解决方案5】:

我会使用一个线程来完成整个过程。那就是生成邮件正文并发送。只是因为生成邮件正文不需要时间,但发送电子邮件会。

此外,如果您使用 Windows 附带的 SMTP 服务器,那么您只需将电子邮件放入队列文件夹中,然后 smtp 服务器将负责发送电子邮件。

因此,您可以启动多个线程(保持数量上限),每个线程都在执行它的工作。如果您正在处理一组作业(数据),那么您可以进行数据并行化(即将数据拆分为与系统上的核心数量相匹配的块,例如并拍摄作业(线程)。

无论您采用哪种方式,使用任务都会使这一切变得相当简单,即 2 个线程发送一封电子邮件或一个线程完成整个工作,但使用多个线程并行执行多个工作。

【讨论】:

  • 阅读我的 cmets(我有充分的理由使用多线程)! ...这个问题不是关于 2 个线程的设计,而是关于队列的设计! ...
  • 我看到您已经编辑了您的问题 - 但是,我仍然没有在其中看到问题。考虑到其他各种响应,您确定您对多线程的阅读是一本好书吗?在我看来,您希望所有电子邮件都能在准确的时间发送出去。这将需要服务器端的大量带宽。我在大约 5 分钟内生成并发送了 100,000 封电子邮件,但这些电子邮件在 smtp 服务器中停留了一个多小时,因为实际发送电子邮件需要很长时间,并且您需要大约 20GB/秒的带宽引导负载。跨度>
  • 深入挖掘:必须将个性化和邮寄之间的时间跨度缩短到最低限度。否则个性化不会尽可能准确......我不关心带宽,这不会是一个问题,因为这是一个更普遍的问题(我相信)。
  • 与实际发送电子邮件相比,个性化和生成电子邮件不会花费任何时间。你认为什么是“邮寄”。如果通过邮寄您的意思是电子邮件确实已发送,那么您会大吃一惊,因为这需要很多时间,并且取决于要发送的电子邮件数量……无论如何,我仍然没有很明白你的问题,所以我就退后了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-08-13
  • 1970-01-01
  • 2018-08-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多