【问题标题】:How to multithread AsyncConsumer with Django Channels如何使用 Django 通道对 AsyncConsumer 进行多线程处理
【发布时间】:2018-09-12 14:46:43
【问题描述】:

我已经使用 Django Channels 工作了一个星期了,runworker 并行性让我很不爽。

例如,我有一个 MQTT 客户端,它在收到消息时在频道中发布,基本。

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

这个发送得很好。我可以发送我想要的多少,它将被发送到redis队列。到频道mqtt

然后我运行worker,它将重定向队列中mqtt的消息:

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

这就是问题的开始。下面是 AsyncConsumer 读取数据的内容:

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

为了模拟业务任务,我设置了一个睡眠。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,而不是多线程的 5 秒。如下图。

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

任何有关该主题的情报都会有很大帮助,在此先感谢!

编辑:我发现管理它的唯一方法是创建一个包含工作人员的执行程序来异步执行它。但我不确定它在部署方面的效率

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)

【问题讨论】:

    标签: python django python-3.x multithreading django-channels


    【解决方案1】:

    这是目前的设计

    是的,这是预期的设计,因为它是最安全的方式(如果您不知道,它可以防止出现竞争条件)。如果您乐于并行运行消息,只需在需要时分离您自己的协程(使用asyncio.create_task),确保清理它们并在关闭时等待它们。这是相当多的开销,因此希望我们将来会为消费者提供一种选择加入模式,但目前我们提供的只是安全选项。

    https://github.com/django/channels/issues/1203

    【讨论】:

      猜你喜欢
      • 2019-02-27
      • 1970-01-01
      • 2020-06-26
      • 1970-01-01
      • 2019-04-23
      • 1970-01-01
      • 2019-09-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多