【问题标题】:mqtt client does not receive message in case of thread and rest-apimqtt客户端在线程和rest-api的情况下没有收到消息
【发布时间】:2020-02-15 13:13:34
【问题描述】:

我有一个基于flask 和mqtt 的python 脚本。用例是通过 rest-api 接收请求,然后创建一个新线程,该线程在 mosquitto mqtt 上发布一些消息并期望响应(请参阅订阅)。我的问题是我没有收到任何消息。我认为它与线程有关,因为没有线程它工作得很好.. 你知道可能是什么问题吗?

谢谢你的期待!

这里是代码:

from flask import Flask, Response
import paho.mqtt.client as mqtt
from threading import Thread
import threading

app = Flask(__name__)
lock = threading.Lock()

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print("Connected with result code {0}".format(str(rc)))  # Print result of connection attempt
    client.subscribe("/mytopic")


def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    print(msg.topic)


client = mqtt.Client(client_id=client_name, clean_session=True)
client.on_connect = on_connect  # Define callback function for successful connection
client.on_message = on_message  # Define callback function for receipt of a message
client.username_pw_set(mqtt_user, mqtt_password)
client.loop_start()
client.connect(mqtt_host)    


def test(param1, param2):
   lock.acquire()
   try:

      ret = client.publish("/mytopic", "")
      while True:
            check the response from mqtt => but i don't get the response anymore
            ....
            break
    finally:
        lock.release()
    return result


@app.route('/test/check', methods=['POST'])
def check():
    global sessionId
    sessionId = sessionId + 1
    t = Thread(target=test, args=(sessionId,None))
    t.start()
    return {'id': sessionId, 'eta': 0}


if __name__ == '__main__':
    app.run(debug=True)

【问题讨论】:

    标签: python multithreading flask mqtt


    【解决方案1】:

    这有几个问题。

    1. client.connect()client.subscribe() 调用都需要反复运行客户端网络循环才能正确完成。
    2. 网络循环需要在连接建立后的每个保持活动期间至少运行一次,以阻止代理断开客户端连接为死机。这意味着如果在启动代码和第一个 REST 请求之间存在延迟,则客户端将断开连接。

    最好使用client.start_loop()函数自己在后台连续运行MQTT客户端网络循环。

    您还应该删除在on_connect() 回调之外的对client.subscribe() 的调用。

    编辑: 正如 cmets/chat 中所讨论的那样,以下作品。看起来在调试模式下运行烧瓶应用程序做了一些奇怪的事情,并一遍又一遍地创建多个具有相同客户端 ID 的 MQTT 客户端。这导致代理不断地踢掉旧的,因此消息永远不会被传递。

    from flask import Flask, Response
    import paho.mqtt.client as mqtt
    import time
    from threading import Thread
    import threading
    
    app = Flask(__name__)
    lock = threading.Lock()
    sessionId=0
    cont=True
    
    def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
    print("Connected with result code {0}".format(str(rc))) # Print result of connection attempt
    client.subscribe("mytopic")
    
    
    def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
    global cont
    print(msg.topic)
    cont=False
    
    
    client = mqtt.Client(client_id="foo", clean_session=True)
    client.on_connect = on_connect # Define callback function for successful connection
    client.on_message = on_message # Define callback function for receipt of a message
    #client.username_pw_set(mqtt_user, mqtt_password)
    client.connect("localhost", port=1884)
    client.loop_start()
    
    def test(param1, param2):
    lock.acquire()
    try:
    ret = client.publish("mytopic", "foo")
    while cont:
    time.sleep(5)
    print("loop")
    finally:
    lock.release()
    
    result = "foo"
    
    return result
    
    
    @app.route('/test/check', methods=['POST'])
    def check():
    global sessionId
    sessionId = sessionId + 1
    t = Thread(target=test, args=(sessionId,None))
    t.start()
    return {'id': sessionId, 'eta': 0}
    
    
    if __name__ == '__main__':
    print("started")
    app.run()
    

    【讨论】:

    • 您好,感谢您的回答。据我了解,我应该将连接和订阅例程移至测试功能? btw.start_loop 不行,我也试过了...
    • 您在哪里尝试过start_loop()?应该在client.connect()之前调用
    • aaa...好的,我会试试...片刻
    • 我试过了,问题是发布调用的测试函数返回MQTT_ERR_NO_CONN。在 mosquitto 服务器中,我只能看到:“收到来自测试人员的订阅”=> 这意味着连接正在工作,但在线程中,发布调用没有返回连接:/ hmmm......
    • 好的,从app.start() 中删除debug=True,事情应该会变得更好。由于某种原因导致客户端不断重新连接,这意味着在发送或传递消息时它实际上并未连接。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-22
    • 1970-01-01
    • 1970-01-01
    • 2016-04-24
    • 1970-01-01
    相关资源
    最近更新 更多