【问题标题】:Multi Thread & Semaphore & Events多线程 & 信号量 & 事件
【发布时间】:2018-10-28 04:27:48
【问题描述】:

我正在尝试执行一些来自 RabbitMQ 的命令。它大约 5 条消息/秒。所以msg太多了,我必须发送到一个线程执行,但是我没有那么多线程,所以我限制了10个。

所以我们的想法是消息会到达工作线程,放入队列中,10 个线程中的任何一个都会达到峰值并执行。所有这些都使用信号量。

经过一些实验,我不知道为什么,但是我的线程只执行了 3 或 4 个项目,之后它就停止了,没有错误......

我认为的问题是事件调用方法执行时的逻辑,无法更好地思考......

为什么只处理前 4 个消息??

什么模式或更好的方法来做到这一点?

这是我的代码的一些部分:

const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();

static void Main(string[] args)
{
consumer.Received += (sender, ea) =>
               {
                var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
                Console.WriteLine($"Sub-> {m.Subject}");
                queue.Enqueue(ea);
                RUN();
              };

            channel.BasicConsume(queueName, false, consumer);

            Console.Read();
}

private static void RUN()
{
            while (queue.Count > 0)
            {
                sem.WaitOne();
                var item = queue.Dequeue();
                ThreadPool.QueueUserWorkItem(sendmail, item);
            }
}

private static void sendmail(Object item)
{

//.....soem processing stuff....

//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);

//release thread
sem.Release();

}

【问题讨论】:

    标签: c# multithreading events semaphore


    【解决方案1】:

    我认为您可以在这里使用阻塞集合。它将简化代码。 所以你的电子邮件发件人看起来像这样:

    public class ParallelEmailSender : IDisposable
    {
        private readonly BlockingCollection<string> blockingCollection;
    
        public ParallelEmailSender(int threadsCount)
        {
            blockingCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
            for (int i = 0; i < threadsCount; i++)
            {
                Task.Factory.StartNew(SendInternal);
            }
        }
    
        public void Send(string message)
        {
            blockingCollection.Add(message);
        }
    
        private void SendInternal()
        {
            foreach (string message in blockingCollection.GetConsumingEnumerable())
            {
                // send method
            }
        }
    
        public void Dispose()
        {
            blockingCollection.CompleteAdding();
        }
    }
    

    当然,您需要添加错误捕获逻辑,您还可以通过使用取消令牌来改进应用关闭过程。

    我强烈建议阅读约瑟夫·阿尔巴哈里 (Joseph Albahari) 撰写的精彩 e-book about multithreading programming

    【讨论】:

    • 谢谢!但是一个愚蠢的问题...... SendInternal 中的消息是线程安全的吗?
    • 您可能会使用一些代表消息的对象,但只有一个线程会收到该消息。发送代码当然应该是线程安全的,因为它将被多个线程调用。
    • 另一点...我应该在 RabbitMQ 发送的事件中实例化类 ParallelEmailSender 吗?如果是这样就没有意义,因为当事件被触发时它只发送一条消息,所以每条消息都会使类内部有 XX 个线程,并且想法是将每条消息放在不同的线程/并行集合中...... 。 我对么?看看我做了什么gist.github.com/PtkFerraro/14385ec8426d5b0bfd9cdee84dd30989
    • 你应该有一个ParallelEmailSender 的实例,当你收到来自RabbitMQ 的消息时,你只需调用Send 方法。当然,您需要稍微修改ParallelEmailSender,以便它知道如何实际发送电子邮件。
    猜你喜欢
    • 2010-10-15
    • 1970-01-01
    • 1970-01-01
    • 2020-02-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-14
    相关资源
    最近更新 更多