【问题标题】:Testing wrapper around paho.mqtt.client with pytest. Callback is never executed使用 pytest 围绕 paho.mqtt.client 测试包装器。回调永远不会执行
【发布时间】:2021-12-27 10:59:36
【问题描述】:

我为 paho.mqtt.client 类编写了一个包装器。现在我正在使用 pytest 编写测试以确保它正常工作。

def test_send_receive():

    def callback(client, userdata, message: paho.mqtt.client.MQTTMessage):
        assert str(message.payload.decode()) == "testmessage"

    def publishing(client: MqttHandler):
        client.pub_to_topic("testmessage", "test")
        time.sleep(3)

mqtt1 = MqttHandler("localhost")
mqtt2 = MqttHandler("localhost")
mqtt1.sub_with_callback("test", callback)

x = threading.Thread(target=publishing, args=(mqtt2,))
x.start()
x.join()

当我将其作为普通脚本调用时,此测试代码可以正常工作。但是,当我在 pytest 环境中执行代码时,回调中的代码永远不会执行。即使我提出错误而不是断言语句,测试也会通过。我要么正在寻找关于为什么在 pytest-environment 中从未调用回调的答案,要么正在寻找用于测试同一事物的不同解决方案

【问题讨论】:

    标签: python pytest mqtt


    【解决方案1】:

    不应该

    mqttt1.sub_with_callback("test", test_send_receive.callback)
    

    由于callback 函数被声明为test_send_receive 的一部分

    【讨论】:

    • 当我用 test_send_receive.callback 添加回调时,它会引发一个属性错误,说 test_send_receive 没有属性“回调”
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多