【问题标题】:Listen to mqtt topics with django channels and celery使用 django 频道和 celery 收听 mqtt 主题
【发布时间】:2021-09-29 07:05:58
【问题描述】:

我想要一种将 django 与 mqtt 集成的方法,为此我想到的第一件事是使用 django-channels 和一个支持 mqtt over web sockets 的 mqtt 代理,因此我可以在代理和 django 之间直接通信-频道。

但是,我没有找到从 django 启动 websocket 客户端的方法,根据 link 这是不可能的。

由于我也开始研究任务队列,我想知道使用 paho-mqtt 启动 mqtt 客户端然后使用 celery 在单独的进程中运行它是否是一个好习惯。然后这个进程会通过 websockets 将 broker 收到的消息转发到 django 通道,这样我也可以与客户端进程通信,发布数据或在需要时停止 mqtt 客户端,所有这些都直接来自 django。

我对这个想法有点怀疑,因为我还读到了在 celery 中运行的过程不应该花费太长时间才能完成,在这种情况下,这正是我想要做的。

所以我的问题是,这是一个多么糟糕的主意?有没有其他选项可以直接将django与mqtt集成?

*注意:我不想在服务器上运行单独的进程,我希望能够从 django 启动和停止进程,以便从 web gui 完全控制 mqtt 客户端

【问题讨论】:

    标签: django websocket celery mqtt django-channels


    【解决方案1】:

    我找到了一个更好的方法,不需要使用芹菜。

    我只是在 app/apps.py 上的 ready 方法上启动了一个 mqtt 客户端,因此每次运行应用程序时都会启动一个客户端。从这里我可以使用 django-channels 或信号与系统的其他部分进行通信。

    apps.py:

    from django.apps import AppConfig
    from threading import Thread
    import paho.mqtt.client as mqtt
    
    
    class MqttClient(Thread):
        def __init__(self, broker, port, timeout, topics):
        super(MqttClient, self).__init__()
        self.client = mqtt.Client()
        self.broker = broker
        self.port = port
        self.timeout = timeout
        self.topics = topics
        self.total_messages = 0
    
    #  run method override from Thread class
    def run(self):
        self.connect_to_broker()
    
    def connect_to_broker(self):
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect(self.broker, self.port, self.timeout)
        self.client.loop_forever()
    
    # The callback for when a PUBLISH message is received from the server.
    def on_message(self, client, userdata, msg):
        self.total_messages = self.total_messages + 1
        print(str(msg.payload) + "Total: {}".format(self.total_messages))
    
    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(self, client, userdata, flags, rc):
        #  Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
        for topic in self.topics:
            client.subscribe(topic)
    
    
    class CoreConfig(AppConfig):
        default_auto_field = 'django.db.models.BigAutoField'
        name = 'core'
    
    def ready(self):
        MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()
    

    【讨论】:

      【解决方案2】:

      如果您在 Django 应用程序中使用 ASGI,则可以使用 MQTTAsgi。完全披露我是 MQTTAsgi 的作者。

      它是一个完整的 Django 和 MQTT 协议服务器。

      要使用 mqtt 协议服务器,您可以运行您的应用程序,首先您需要创建一个 MQTT 消费者:

      from mqttasgi.consumers import MqttConsumer
      class MyMqttConsumer(MqttConsumer):
      
          async def connect(self):
              await self.subscribe('my/testing/topic', 2)
      
          async def receive(self, mqtt_message):
              print('Received a message at topic:', mqtt_mesage['topic'])
              print('With payload', mqtt_message['payload'])
              print('And QOS:', mqtt_message['qos'])
              pass
      
          async def disconnect(self):
              await self.unsubscribe('my/testing/topic')
      

      那么你应该将此协议添加到协议路由器:

      application = ProtocolTypeRouter({
            'websocket': AllowedHostsOriginValidator(URLRouter([
                url('.*', WebsocketConsumer)
            ])),
            'mqtt': MyMqttConsumer,
            ....
          })
      

      然后就可以用*运行mqtt协议服务器了:

      mqttasgi -H localhost -p 1883 my_application.asgi:application
      

      *假设代理位于 localhost 和 1883 端口。

      【讨论】:

        【解决方案3】:

        我也想解决这个问题,但没有找到真正适合 Channels 架构的好的解决方案(虽然 MQTTAsgi 很接近,但它使用 paho-mqtt 并且没有完全使用 Channels 层系统)。

        我创建了:https://pypi.org/project/chanmqttproxy/

        (来源https://github.com/lbt/channels-mqtt-proxy

        本质上,它是 MQTT 的完全异步 Channels 3 代理,允许发布和订阅。该文档展示了如何扩展标准 Channels 教程,以便在 MQTT 主题上看到聊天消息 - 并且可以从 MQTT 主题发送到所有 websocket 浏览器客户端。

        我不知道这是 OP 想要的,就收听 MQTT 主题而言,但对于一般情况,我认为这是一个很好的解决方案。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2021-12-22
          • 1970-01-01
          • 1970-01-01
          • 2021-01-20
          • 2019-07-18
          • 1970-01-01
          • 2017-11-06
          • 2018-08-20
          相关资源
          最近更新 更多