【发布时间】:2014-10-31 14:19:08
【问题描述】:
我在 .net 中使用 RabbitMQ,当我在队列中放置 100 条消息时,我看到了一个奇怪的问题。它处理大约 50 条消息,然后 Dequeue() 方法就挂起。如果我重新启动服务,它会处理剩余的项目。
编辑:它正在处理 50% 的队列。当我添加 1000 条消息时,它只处理 500 条。即使是单线程
我在这里错过了什么?
private void InitializeAgent() {
var agentFactory = new ConnectionFactory() { HostName = "localhost" };
agentConnection = agentFactory.CreateConnection();
agentChannel = agentConnection.CreateModel();
var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null);
consumer = new QueueingBasicConsumer(agentChannel);
agentChannel.BasicConsume(GetType().Name, false, consumer);
}
public void DequeueMessages() {
ThreadPool.SetMaxThreads(200, 200);
ThreadPool.SetMinThreads(200, 200);
var ea = consumer.Queue.Dequeue();
ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}
public void AgentTask() {
var instance = factory.GetInstance(threadItem);
while (true)
DequeueMessages();
}
private void ProcessWorkInThread(object state) {
var ea = state as BasicDeliverEventArgs;
var message = Encoding.UTF8.GetString(ea.Body);
var settings = new JsonSerializerSettings();
settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public };
var item = JsonConvert.DeserializeObject<TEntity>(message, settings);
Thread.Sleep(10000) //simulate work
lock (agentChannel)
agentChannel.BasicAck(ea.DeliveryTag, false);
}
【问题讨论】:
-
相当肯定 IModel 不是线程安全的。 .net 用户指南特别指出,IModel 不应在线程之间共享。
-
@user1450877 我可以在 dequeue & ack 周围加一个锁吗?
-
线程必须使用一个 IModel。
-
@ChrisKooken 我什至不确定这是否是您的问题,但这似乎是一个不错的起点。就个人而言,如果我要使用您的消息处理模型,我会将消息出列并在 DequeueMessages() 方法中直接确认(或关闭确认),然后让线程完成所需的工作。如果任何特定的处理出现问题,只需重新排队即可。
-
@user1450877 参见上面的编辑。即使我立即确认,或者一起删除线程,它仍然只处理启动时队列大小的 50%。
标签: c# multithreading rabbitmq