【问题标题】:How can I read the message properties of an Activemq.Advisory message from a Python3.7 application如何从 Python3.7 应用程序读取 Activemq.Advisory 消息的消息属性
【发布时间】:2021-11-20 05:03:03
【问题描述】:

我在 Python3.7 的 Raspberry Pi4 上使用 paho.mqtt.client 将消息发布到本地运行的 ActiveMQ 实例。我想监控我的队列消费者的状态,以便在订阅者离线时发出警报(使用 snap7 向 Siemens PLC)。当我手动使用 QueueExplorer 之类的程序时,我可以查看队列的“咨询”主题中的一条消息,并且从它的自定义属性中,我可以看到“消费者计数”并确定有一个活跃的消费者。

我需要将此信息输入我的应用程序,但不幸的是 MQTT 消息没有 paho.mqtt.client 可以访问的“属性”,并且 Activemq 的“咨询”消息在“正文”中没有任何数据'的消息,所以我必须找到另一种阅读“消费者计数”的方法。我查看了 STOMP,但找不到任何阅读消息“属性”的示例。请问有什么建议吗?

如果我使用 ActiveMQ-CPP 库中的 CMS API,这是我需要的代码,但我相信这是用 C++ 编写的,这不是我的世界! :-)

void AdvisoryProducer::onMessage( const cms::Message* message ) {

   if( message->getCMSType() == "Advisory" ) {

       std::cout << "Received an Advisory Message!" << std::endl;

       if( message->propertyExists( "consumerCount" ) ) {

           std::string consumerCount = message->getStringProperty( "consumerCount" );
           std::cout << "Number of Consumers = " << consumerCount << std::endl;

           // Do Something Meaningful here....
       }

   } else {
       std::cout << "Received a Non-Advisory Message!" << std::endl;
   }
}

这是我目前尝试使用的代码,但不是订阅主题,而是创建一个新队列!

import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, headers, message):
        print('received an error "%s"' % message)
    def on_message(self, message):
        print('received a message "%s"' % message)
        print(message.body)
        print(message.cmd)
        print(message.headers[''])

TOPIC = 'ActiveMQ.Advisory.Consumer.Topic.xbox_001'

conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'password', wait=True)
conn.subscribe(destination=TOPIC, id='1', ack='auto')

time.sleep(3600)
conn.disconnect()

在我运行代码之前,我可以看到在创建“xbox_001”主题时创建的现有主题,并且在订阅者连接等时在咨询主题中生成消息,因此可以正常工作。

但是当我运行代码时,它会创建一个新队列(和一个咨询主题),而不是订阅现有主题!

【问题讨论】:

  • 您在 Python 应用程序中使用什么 STOMP 客户端?
  • 我正在使用 stomp.py V7.0.0
  • 尝试使用TOPIC = 'topic/ActiveMQ.Advisory.Consumer.Topic.xbox_001'。我相信 STOMP 客户端需要使用 queue/topic/ 前缀来告诉代理使用哪种语义,因为 STOMP 协议本身没有这样的区别。

标签: python activemq


【解决方案1】:

我终于设法解决了这个问题,这是一个简单(但很烦人)的字符串/路径格式问题。与编程 EVER! 历史上的任何其他代码不同,路径是“/”和“.”的组合。分隔符,尽管文件结构显然有“主题”而不是“主题”作为根,但路径需要......等待它...... :-)


TOPIC = '/topic/ActiveMQ.Advisory.Consumer.Topic.xbox_001'

脑子一团糟……

为了完整起见,这里是最终代码。


import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, headers, message):
        print('received an error "%s"' % message)
    def on_message(self, message):
        print('received a message "%s"' % message)
        print(message.body)
        print(message.cmd)
        print(message.headers['consumerCount'])

TOPIC = '/topic/ActiveMQ.Advisory.Consumer.Topic.xbox_001'
conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'password', wait=True)
conn.subscribe(destination=TOPIC, id='1', ack='auto')

time.sleep(3600)
conn.disconnect()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多