【发布时间】:2021-12-27 10:59:36
【问题描述】:
我为 paho.mqtt.client 类编写了一个包装器。现在我正在使用 pytest 编写测试以确保它正常工作。
def test_send_receive():
def callback(client, userdata, message: paho.mqtt.client.MQTTMessage):
assert str(message.payload.decode()) == "testmessage"
def publishing(client: MqttHandler):
client.pub_to_topic("testmessage", "test")
time.sleep(3)
mqtt1 = MqttHandler("localhost")
mqtt2 = MqttHandler("localhost")
mqtt1.sub_with_callback("test", callback)
x = threading.Thread(target=publishing, args=(mqtt2,))
x.start()
x.join()
当我将其作为普通脚本调用时,此测试代码可以正常工作。但是,当我在 pytest 环境中执行代码时,回调中的代码永远不会执行。即使我提出错误而不是断言语句,测试也会通过。我要么正在寻找关于为什么在 pytest-environment 中从未调用回调的答案,要么正在寻找用于测试同一事物的不同解决方案
【问题讨论】: