【发布时间】:2021-11-20 05:03:03
【问题描述】:
我在 Python3.7 的 Raspberry Pi4 上使用 paho.mqtt.client 将消息发布到本地运行的 ActiveMQ 实例。我想监控我的队列消费者的状态,以便在订阅者离线时发出警报(使用 snap7 向 Siemens PLC)。当我手动使用 QueueExplorer 之类的程序时,我可以查看队列的“咨询”主题中的一条消息,并且从它的自定义属性中,我可以看到“消费者计数”并确定有一个活跃的消费者。
我需要将此信息输入我的应用程序,但不幸的是 MQTT 消息没有 paho.mqtt.client 可以访问的“属性”,并且 Activemq 的“咨询”消息在“正文”中没有任何数据'的消息,所以我必须找到另一种阅读“消费者计数”的方法。我查看了 STOMP,但找不到任何阅读消息“属性”的示例。请问有什么建议吗?
如果我使用 ActiveMQ-CPP 库中的 CMS API,这是我需要的代码,但我相信这是用 C++ 编写的,这不是我的世界! :-)
void AdvisoryProducer::onMessage( const cms::Message* message ) {
if( message->getCMSType() == "Advisory" ) {
std::cout << "Received an Advisory Message!" << std::endl;
if( message->propertyExists( "consumerCount" ) ) {
std::string consumerCount = message->getStringProperty( "consumerCount" );
std::cout << "Number of Consumers = " << consumerCount << std::endl;
// Do Something Meaningful here....
}
} else {
std::cout << "Received a Non-Advisory Message!" << std::endl;
}
}
这是我目前尝试使用的代码,但不是订阅主题,而是创建一个新队列!
import time
import sys
import stomp
class MyListener(stomp.ConnectionListener):
def on_error(self, headers, message):
print('received an error "%s"' % message)
def on_message(self, message):
print('received a message "%s"' % message)
print(message.body)
print(message.cmd)
print(message.headers[''])
TOPIC = 'ActiveMQ.Advisory.Consumer.Topic.xbox_001'
conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'password', wait=True)
conn.subscribe(destination=TOPIC, id='1', ack='auto')
time.sleep(3600)
conn.disconnect()
在我运行代码之前,我可以看到在创建“xbox_001”主题时创建的现有主题,并且在订阅者连接等时在咨询主题中生成消息,因此可以正常工作。
但是当我运行代码时,它会创建一个新队列(和一个咨询主题),而不是订阅现有主题!
【问题讨论】:
-
您在 Python 应用程序中使用什么 STOMP 客户端?
-
我正在使用 stomp.py V7.0.0
-
尝试使用
TOPIC = 'topic/ActiveMQ.Advisory.Consumer.Topic.xbox_001'。我相信 STOMP 客户端需要使用queue/或topic/前缀来告诉代理使用哪种语义,因为 STOMP 协议本身没有这样的区别。