【问题标题】:How does MessageQueue.BeginReceive work and how to use it correctly?MessageQueue.BeginReceive 是如何工作的以及如何正确使用它?
【发布时间】:2011-11-17 17:57:30
【问题描述】:

我目前有一个后台线程。在这个线程中是一个无限循环。

此循环偶尔会更新数据库中的一些值,然后在 MessageQueue 上侦听 1 秒(使用 queue.Receive(TimeSpan.FromSeconds(1)) )。

只要没有消息进来,这个调用就会在内部抛出一个 MessageQueueException (Timeout) 被捕获,然后循环继续。如果有消息调用正常返回并处理消息,之后循环继续。

这会导致 很多 的第一次机会异常(每秒,除非有要处理的消息),这会在调试输出中产生垃圾邮件,并且当我忘记排除 MessageQueueExceptions 时也会中断调试器。

那么,MessageQueue 的异步处理是如何正确完成的,同时仍然确保,只要我的应用程序运行,队列就会受到监控,并且数据库也会不时更新。当然这里的线程不应该用完100%的CPU。

我只需要大图或一些正确完成异步处理的提示。

【问题讨论】:

  • 听上去,没有消息进来的场景并不是异常情况,在这种情况下抛出异常似乎是一种设计味道。
  • 是的,你不能在无事可做的情况下只返回null或default吗,为了避免从方法返回而抛出异常有点废话。
  • MessageQueue 类本身正在抛出异常,当超时发生时,不是我。这是我对这个解决方案的问题,是什么让我感到不安。
  • 超时异常是 MSMQ 设计的工作方式。你必须每秒轮询一次队列吗?

标签: c# asynchronous msmq message-queue


【解决方案1】:

我建议不要在线程中循环,而是为 MessageQueue 的 ReceiveCompleted 事件注册一个委托,如下所述:

使用系统; 使用 System.Messaging;

命名空间 MyProject { /// /// 为示例提供一个容器类。 /// 公共类 MyNewQueue {

    //**************************************************
    // Provides an entry point into the application.
    //       
    // This example performs asynchronous receive operation
    // processing.
    //**************************************************

    public static void Main()
    {
        // Create an instance of MessageQueue. Set its formatter.
        MessageQueue myQueue = new MessageQueue(".\\myQueue");
        myQueue.Formatter = new XmlMessageFormatter(new Type[]
            {typeof(String)});

        // Add an event handler for the ReceiveCompleted event.
        myQueue.ReceiveCompleted += new 
            ReceiveCompletedEventHandler(MyReceiveCompleted);

        // Begin the asynchronous receive operation.
        myQueue.BeginReceive();

        // Do other work on the current thread.

        return;
    }


    //**************************************************
    // Provides an event handler for the ReceiveCompleted
    // event.
    //**************************************************

    private static void MyReceiveCompleted(Object source, 
        ReceiveCompletedEventArgs asyncResult)
    {
        // Connect to the queue.
        MessageQueue mq = (MessageQueue)source;

        // End the asynchronous Receive operation.
        Message m = mq.EndReceive(asyncResult.AsyncResult);

        // Display message information on the screen.
        Console.WriteLine("Message: " + (string)m.Body);

        // Restart the asynchronous Receive operation.
        mq.BeginReceive();

        return; 
    }
}

}

来源:https://docs.microsoft.com/en-us/dotnet/api/system.messaging.messagequeue.receivecompleted?view=netframework-4.7.2

【讨论】:

  • 很好的例子,感谢@DaveRead
  • 链接已失效.. 是的.. 人们仍在使用该死的 MSMQ :( 而且,如果您考虑处理/限制消息数量,这将变得非常难以处理
  • 用代码提取和源链接更新了答案
【解决方案2】:

您是否考虑过从 MessageQueue.GetMessageEnumerator2 返回的 MessageEnumerator

  • 您将获得队列的动态内容,以在迭代期间检查和删除队列中的消息。
  • 如果没有消息,则 MoveNext() 将返回 false,您无需捕获第一次机会异常
  • 如果在您开始迭代后有新消息,那么它们将被迭代(如果它们放在光标之后)。
  • 如果光标前有新消息,那么您可以重置迭代器或继续(如果您目前不需要优先级较低的消息)。

【讨论】:

  • 好的,谢谢。这很棒。所以我可以等待1秒。如果没有消息,它只会返回 false,我可以将其余的逻辑保持原样。没有更多的例外:)
【解决方案3】:

与 Jamie Dixon 的评论相反,这种情况非常特殊。注意方法及其参数的命名:BeginReceive(TimeSpan timeout)

如果该方法被命名为 BeginTryReceive,那么如果没有收到任何消息就完全正常了。将其命名为 BeginReceive(或 Receive,对于同步版本)意味着消息应该进入队列。 TimeSpan 参数被命名为 timeout 也很重要,因为超时是异常的。超时意味着预期响应,但没有给出响应,调用者选择停止等待并假设发生了错误。当您使用 1 秒超时调用 BeginReceive/Receive 时,您是在说如果到那时还没有消息进入队列,那么一定是出了问题,我们需要处理它。

如果我理解您想要正确执行的操作,我将执行此操作的方式是:

  • 如果我没有将空队列视为错误,则调用 BeginReceive 时超时非常大,或者甚至没有超时。
  • 将事件处理程序附加到 ReceiveCompleted 事件,该事件 1) 处理消息,以及 2) 再次调用 BeginReceive。
  • 我不会使用无限循环。在使用 BeginReceive 等异步方法时,这既是不好的做法,也是完全多余的。
  • 编辑:要放弃没有被任何客户端读取的队列,让队列编写者查看队列以确定它是否“死亡”。

编辑:我有另一个建议。由于我不知道您的应用程序的详细信息,我不知道它是否可行或合适。在我看来,您基本上是在客户端和服务器之间建立连接,消息队列作为通信通道。为什么这是一个“连接”?因为如果没有人在听,队列将不会被写入。我认为,这几乎就是一种联系。使用套接字或命名管道来传输消息不是更合适吗?这样,客户端在完成读取后只需关闭 Stream 对象,并立即通知服务器。正如我所说,我不知道它是否适用于你正在做的事情,但感觉它是一个更合适的沟通渠道。

【讨论】:

  • 正如我之前的问题中所述,不时更新数据库中的表以告诉其他应用程序侦听器仍然存在,这一点很重要。因此,当无限期地等待消息时,表不会被更新,客户端被认为是死的,不会再成为消息的目标。但重要的是客户端获取发送的消息。因此,在一个逻辑关注点中同时监听消息表的不断更新是很重要的。
  • 正如我所说:表中都是活动的目标队列。当队列在一定时间内没有更新时,它被认为是死的,将从表中删除。发送消息时,它会发送到所有活动队列。
  • 好的。我找不到你在哪里写的。无论如何,另一种方法(如果你问我更好)是让队列编写者通过查看队列来确定队列何时死亡,以查看消息是否被客户端从队列中删除。
  • 不,它不是“连接”。这只是一个通知。发送者(一个客户端,可能是在另一台机器上运行的相同代码,但也可能是在其他地方运行的服务)需要通知所有客户端(包括他自己)某些事件。在这种情况下,它会查找数据库中的所有活动客户端并在其队列中发送消息。然后客户端可以(但不需要)处理消息。套接字将需要所有正在运行的实例之间的大量连接来通知所有其他客户端并消耗更多资源。
猜你喜欢
  • 2014-06-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-04-04
  • 1970-01-01
  • 2023-03-08
  • 2016-07-24
相关资源
最近更新 更多