【问题标题】:How do I subscribe to a MSMQ queue but only "peek" the message in .Net?如何订阅 MSMQ 队列但仅在 .Net 中“查看”消息?
【发布时间】:2011-02-15 04:08:01
【问题描述】:

我们有一个接收消息并由应用程序处理的 MSMQ 队列设置。我们想让另一个进程订阅队列,然后读取消息并记录它的内容。

我已经有了这个,问题是它一直在偷看队列。运行时服务器上的 CPU 约为 40%。 mqsvc.exe 以 30% 运行,此应用以 10% 运行。我宁愿有一些东西只是等待消息进来,得到它的通知,然后记录它而不不断地轮询服务器。

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub

【问题讨论】:

    标签: .net vb.net windows-services msmq peek


    【解决方案1】:

    恕我直言,您应该在队列中打开日志。然后,您可以保证保留已提交到队列的所有消息的副本,而您费力地尝试创建自己的机制来记录所有消息时,情况并非如此。

    如果您想要比队列本身更易于阅读的内容(我当然希望如此),那么定期记录和删除日志消息会更容易和更可靠。那么这个过程的运行速度有多快并不重要,您只需要获取一次消息,总体而言,这只是解决问题的更好方法。

    【讨论】:

    • 大声笑,我现在注意到这是 7 年前的事了。抱歉,我意识到我的输入来得太晚了!但是我现在偶然发现了这个线程,因为我正在制作一个工具,它允许实时监控一堆队列,并一次在许多不同的服务器上搜索大型队列集(在我的例子中大约 700 个)。为了提高效率,我希望客户端在每次执行搜索时都在本地缓存很多内容(这样我就不必从所有队列中检索带有正文和扩展名的所有消息)。
    【解决方案2】:

    您是否尝试过使用MSMQEvent.Arrived 来跟踪消息?

    当表示打开队列的 MSMQQueue 对象实例的 MSMQQueue.EnableNotification 方法已被调用并且消息被发现或到达队列中的适用位置时,将触发 MSMQEvent 对象的 Arrived 事件。

    【讨论】:

      【解决方案3】:

      这对我有用。它在等待消息时阻塞线程。每个循环周期都会检查类成员 _bServiceRunning 以查看线程是否应该中止。

          private void ProcessMessageQueue(MessageQueue taskQueue)
          {
              // Set the formatter to indicate body contains a binary message:
              taskQueue.Formatter = new BinaryMessageFormatter();
      
              // Specify to retrieve selected properties.
              MessagePropertyFilter myFilter = new MessagePropertyFilter();
              myFilter.SetAll();
              taskQueue.MessageReadPropertyFilter = myFilter;
      
              TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds
      
              // Monitor the MSMQ until the service is stopped:
              while (_bServiceRunning)
              {
                  rxMessage = null;
      
                  // Listen to the queue for the configured duration:
                  try
                  {
                      // See if a message is available, and if so remove if from the queue if any required
                      // web service is available:
                      taskQueue.Peek(tsQueueReceiveTimeout);
      
                      // If an IOTimeout was not thrown, there is a message in the queue
                      // Get all the messages; this does not remove any messages
                      Message[] arrMessages = taskQueue.GetAllMessages();
      
                      // TODO: process the message objects here;
                      //       they are copies of the messages in the queue
                      //       Note that subsequent calls will return the same messages if they are
                      //       still on the queue, so use some structure defined in an outer block
                      //       to identify messages already processed.
      
                  }
                  catch (MessageQueueException mqe)
                  {
                      if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                      {
                          // The peek message time-out has expired; there are no messages waiting in the queue
                          continue; // at "while (_bServiceRunning)"
                      }
                      else
                      {
                          ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                          break; // from "while (_bServiceRunning)"
                      }
                  }
                  catch (Exception ex)
                  {
                      ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                      break; // from "while (_bServiceRunning)"
                  }
              }
      
          } // ProcessMessageQueue()
      

      【讨论】:

      • 我认为原始发帖人试图避免调用Receive,因为有另一个进程正在这样做。
      • @Jeff:感谢您的指点。我更新了代码示例以使用 MessageQueue.GetAllMessages() 方法,该方法读取(但不提取)队列中的所有消息。请注意,如果队列加载了大量消息,这将无法很好地扩展。他可以根据在之前的 GetAllMessages() 调用中找到的消息数量,在循环中添加一个动态调整的 Sleep() 延迟计时器。
      • 这种轮询方法在本质上与问题中的代码相同。如果有另一个进程从队列中删除消息,则这两种解决方案都不能保证看到所有通过队列的消息。在许多情况下,这种方法会捕获“大多数”消息,但你永远不知道什么时候会溜走。发送或接收消息时记录是捕获所有消息的最佳方式。
      【解决方案4】:

      没有任何 API 可以让您只查看每条消息一次。

      问题是BeginPeek 会在队列中已有消息时立即执行其回调。由于您没有删除消息(毕竟这是 peek,而不是接收!),当您的回调再次开始 peek 时,该过程重新开始,因此 MessageFound 几乎不断运行。

      您最好的选择是将消息记录在写入器或读取器中。 Journaling 将在短期内有效(如果您只关心收到的消息),但不是长期解决方案:

      虽然性能开销 从队列中检索消息 仅针对日记功能进行了配置 比检索多出约 20% 没有日记的消息,真正的 成本是意想不到的问题引起的 当未经检查的 MSMQ 服务运行时 内存不足或机器已用完 磁盘空间

      【讨论】:

        【解决方案5】:

        在 peek 迭代之间的 Thread.Sleep(10) 可以为您节省大量周期。

        我能想到的唯一其他选择是将日志记录构建到队列读取应用程序中。

        【讨论】:

        • 我唯一的问题是有另一个进程同时从队列中读取消息。如果我睡着了,其他东西从队列中删除了消息,我不会错过它吗?
        • @lemkepf,即使没有睡眠,您也不能保证看到发布到队列的每条消息。您的日志记录过程不会一直在执行,即使是,在您仍在记录最后一条消息时,可能会从队列中删除一条消息。
        • 这是我采用的解决方案。它将 CPU 从 30% 降至 4% 左右。如果我在这里和那里错过一些,那没什么大不了的。此外,将消息从队列中拉出的过程大约需要 1 分钟才能完成任务,因此等待 10 毫秒就可以了。谢谢。
        猜你喜欢
        • 2018-02-20
        • 2015-01-24
        • 2022-06-11
        • 2010-12-10
        • 1970-01-01
        • 2021-05-17
        • 2012-04-23
        • 2012-11-29
        • 2011-05-28
        相关资源
        最近更新 更多