【问题标题】:What is the best way to subscribe and publish to multiple topics in paho-mqtt?在 paho-mqtt 中订阅和发布多个主题的最佳方式是什么?
【发布时间】:2020-04-17 08:09:24
【问题描述】:


我正在使用 MQTT 来干扰我的网络中的消息,并且对向代理发布和订阅多条消息的最简洁方式有疑问。
首先,我有两个列表:

request_list = [('sensors/system/temperature', 0), 
                ('sensors/system/gyroscope', 1), 
                ('sensors/system/acceleration', 2)]

其中包含我必须将消息发布到的主题。
我的第二个列表定义了我想要发布的消息以及我想要获得回复的主题(== 我必须订阅才能获得答案的主题)。

request_value = ['{"response":"similarity/sensors/system/temperature","duration":"60s"}',
                  {"response":"similarity/sensors/system/gyroscope","duration":"60s"}', 
                 '{"response":"similarity/sensors/system/acceleration","duration":"60s"}'] 


我的代理对于每个主题都是相同的,并在 PORT =“8083”上用 HOST =“192.168.137.1”定义。
现在我正在使用 for 循环,订阅一个主题,发布我的消息并等待消息进来。因为我必须等待每个订阅和发布才能成功,这非常耗时。我当前代码的伪代码如下所示:

list_measurments = []
for topic in request_list:
    client.connect("Broker","Host")
    client.loop_start() 
    client.subscribe("sub_topic")
    client.pub("pub_topic","pub_message")
    client.callback("append list_measurements")
    client.loop_stop() #stop the loop
    client.disconnect

我尝试在我的问题here 中使用线程,但事实证明,线程的正常使用是将相同的消息发布给许多不同的代理。我也想过multiple subscribtions
如果有人能给我一个提示,最干净和最快的方法是什么,我将非常感激。

【问题讨论】:

    标签: python performance mqtt publish-subscribe paho


    【解决方案1】:

    您应该只连接到代理并在 for 循环之外启动客户端循环。

    每次都设置和断开与代理的连接会增加大量开销并留下大量空间来丢失消息。

    您还应该在启动时订阅您想要的所有主题。好的,如果需要,您可以添加更多或取消订阅,但如果它们始终相同,请在连接时订阅。

    基本的通用方法应该如下所示。

    def on_connect(client, userdata, flags, rc):
      for i in request_value:
        client.subscribe(i.response)
    
      for i in request_list:
        //this loop needs to be a bit more complicated
        //as you need to pull the topic from the request_list
        //and the payload from request_value
        client.publish(i)
    
    def on_message(client, userdata, message):
      if message.topic == "similarity/sensors/system/temperature":
        //update temperature
      elif message.topic == "similarity/sensors/system/gyroscope":
        //update gyro
      elif message.topic == "similarity/sensors/system/acceleration":
        //update accel
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("BrokerIP")
    client.loop_start()
    

    如果需要,您还可以再次运行发布循环(因为看起来您一次只请求 60 多个数据)。将request_listrequest_value 数据结构合并到一个列表中可能会更好。

    【讨论】:

    • 感谢您的回答!好吧,我会试试的。我的主题一直在变化,所以我想我必须在每次迭代时订阅,但我会牢记您的建议,并尝试找到一种方法来最小化订阅数量。关于你的回答,我还有一个问题。您没有使用循环停止 - 那是因为您的脚本一直在运行,还是我错了?还有关于 on_message()- 目前我只使用 on_message () -> messages_dic.update({topic: msgr}),所以我基本上只为每条传入消息更新字典。这样做有什么不对吗……
    • 是的,客户端预计会在应用程序的整个生命周期内运行,因此无需在此处停止或断开连接。如果主题名称定期更改,您可以使on_message() 函数中的逻辑更加动态,您只需要跟踪需要匹配的主题以及如何处理该消息。
    • 我确实按照您的建议进行,并且应用程序运行得更快。再次感谢!在创建客户的过程中,我想到了另一个问题。目前我为不同的任务使用不同的客户端——一个客户端订阅所有可用的传感器并返回该信息。接下来是测量队列,第三个客户端发布数据的格式化 JSON。我像“正常”的模块化方式那样做,但想知道如果我的代理和主机没有改变,是否有任何理由不只使用一个客户端?
    • 一个共享客户端很好,在一个进程中拥有多个客户端的唯一原因是它们代表不同的用户,他们对可以发布/订阅哪些主题具有不同的权限。
    猜你喜欢
    • 2021-08-24
    • 1970-01-01
    • 2020-11-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多