【问题标题】:Multithreaded MSMQ Listening多线程 MSMQ 监听
【发布时间】:2013-04-30 22:21:35
【问题描述】:

我目前正在从我的 MSMQ 中读取数据(为简洁起见):

public void Start()
{
    this.queue.ReceiveCompleted += this.ReceiveCompleted;
    this.queue.BeginReceive();
}

void ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
    this.queue.EndReceive(e.AsyncResult);

    try
    {
        var m = e.Message;
        m.Formatter = this.formatter;

        this.Handle(m.Body);
    }
    finally
    {
        this.queue.BeginReceive();
    }
}

但是,这只允许我串行处理消息。如何修改此代码以允许并行消息处理?

我知道我可以将this.queue.BeginReceive(); 移出finally 并移到ReceiveCompleted 的顶部,但是如何阻止产生与我有消息一样多的线程?如何明智地控制并行度,以免线程池泛滥?是否有一些内置机制,或者我必须编写自己的经理?

编辑:我的目标是更快地处理消息。消息的处理涉及对第 3 方的异步调用,因此目前我的实现正在浪费大量时间来通过队列。

谢谢

【问题讨论】:

    标签: c# msmq


    【解决方案1】:

    我认为只托管更多队列阅读器实例会更简单。然后,您可以根据需要通过部署/取消部署更多实例来快速扩展和缩减。

    它还变成了一种管理问题,而不是发展问题,而扩展应该是这样的。

    【讨论】:

    • 我希望是这样,这能保证一条消息只被处理一次吗?还是有可能两个听众可以收到相同的消息?
    • 我应该补充的是,您只能通过使用事务队列来保证这一点。我在文档中找不到它,但是我之前已经设置了这种缩放机制,而且 WCF 与 msmq 的集成提供了一个属性,称为恰好一次,这意味着这是 msmq 的一个特性。
    • 我的队列是事务性的,有什么特别需要我做的吗,还是应该可以正常工作(tm)?
    • @Hugh,你只能通过使用事务队列来保证什么?一条消息只被阅读一次?
    • @John 是的 - 这是我的理解。
    【解决方案2】:

    您可以使用“生产者消费者模式”...

    .NET 4 及更高版本具有 Concurrent 集合,这些集合是线程安全的,并且实现了“大部分无锁”(因此在多线程中表现良好)...

    您可以将BlockingCollection 与 TPL 结合使用来实现您想要的,而不必担心线程池饥饿或类似情况......您只需将行 this.Handle(m.Body); 更改为 MyBlockingCollection.Add(m.Body); 并启动“消费者线程”在MyBlockingCollection 上工作并进行实际工作(即,在MyBlockingCollection 的下一个项目上调用this.Handle,例如,他们通过调用TryTake 获得)...有关基本示例,请参见上面的链接...

    【讨论】:

    • 这听起来像是我只是在我的队列末尾创建了一个非持久内存队列:/
    • 如果我有多个上述代码实例监听同一个队列,MSMQ 是否保证原子弹出,所以一条消息只由一个监听器处理?
    • @AndrewBullock 基本上是一个非持久线程安全的内存队列,可以被多个线程并行使用......
    • 在回答您的问题时,是的 msmq 保证“原子弹出”。
    • @AndrewBullock MSMQ 支持多个阅读器,使用更多线程或更多进程
    猜你喜欢
    • 1970-01-01
    • 2013-07-26
    • 2013-04-24
    • 2013-11-05
    • 1970-01-01
    • 2012-09-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多