【发布时间】:2018-10-28 04:27:48
【问题描述】:
我正在尝试执行一些来自 RabbitMQ 的命令。它大约 5 条消息/秒。所以msg太多了,我必须发送到一个线程执行,但是我没有那么多线程,所以我限制了10个。
所以我们的想法是消息会到达工作线程,放入队列中,10 个线程中的任何一个都会达到峰值并执行。所有这些都使用信号量。
经过一些实验,我不知道为什么,但是我的线程只执行了 3 或 4 个项目,之后它就停止了,没有错误......
我认为的问题是事件调用方法执行时的逻辑,无法更好地思考......
为什么只处理前 4 个消息??
什么模式或更好的方法来做到这一点?
这是我的代码的一些部分:
const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();
static void Main(string[] args)
{
consumer.Received += (sender, ea) =>
{
var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
Console.WriteLine($"Sub-> {m.Subject}");
queue.Enqueue(ea);
RUN();
};
channel.BasicConsume(queueName, false, consumer);
Console.Read();
}
private static void RUN()
{
while (queue.Count > 0)
{
sem.WaitOne();
var item = queue.Dequeue();
ThreadPool.QueueUserWorkItem(sendmail, item);
}
}
private static void sendmail(Object item)
{
//.....soem processing stuff....
//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);
//release thread
sem.Release();
}
【问题讨论】:
标签: c# multithreading events semaphore