【问题标题】:Integrating a data producer as worker to Django Channels 2.x将数据生产者作为工作人员集成到 Django Channels 2.x
【发布时间】:2018-06-28 13:15:48
【问题描述】:

我正在开发一个应用程序,其中将推送给客户端的实时数据将来自外部 API。它的一个简单版本可以被认为是一个外汇货币跟踪器。用户将指定她想要跟踪的货币(美元、欧元、英镑等)并接收实时更新。货币数据将通过长轮询来自外部 API。我的问题是如何将此数据生产者集成到渠道中?

在所有频道示例中,我发现工人的工作是由一个事件触发的,但在我的情况下,它将从一开始就开始,持续工作,而不是接收事件,它只会将新值推送到频道层,以便通知订阅者。所以我不确定消费者模式是否正确。总结一下我的问题:

  • 我是否应该使用消费者来完成这项任务以及如何设置它?考虑将通过长轮询异步或同步消费者访问 API?开始在其连接方法中轮询外部 API 还是为此发送一次性事件?从何处、何时发送此“开始工作”事件?

  • 我还想使用 redis 来存储用于向用户提供货币初始值的值。他们将开始在连接时侦听更新,但可能会在几秒钟后更新。我可以访问通道层使用的 redis 连接实例,还是需要为此打开另一个到我的 redis 的连接?

数据生产者的另一种选择可以将其完全保留在 Django 通道之外,如 here 所述,只是将数据推送到通道层,但我不确定在部署过程中这可能会对 daphne 造成问题。我的意思是如何确保它保持正常运行并与频道很好地共享资源?

谢谢。

【问题讨论】:

    标签: django django-channels


    【解决方案1】:

    工人适合您的用例。它们意味着长期运行,并且每个请求都没有新实例。如果你想让你的消费者异步,你必须确保你所做的任何事情都不会阻塞。所有 db 查询都必须包含在 database_sync_to_async 中,即使 db 调用发生在调用堆栈的下 5 级。您可以使用 Django 缓存 API 连接到 Redis,但最好在它之外工作以保持一切异步。直接使用 redis 库通道使用,因为它具有将 redis 用作缓存的异步方法。

    【讨论】:

    • 感谢您的回答。我得到了一些工作。你知道如何访问频道自己的 redis 异步方法吗?
    • 如果要发送消息,请使用 channels redis 包中的方法。如果您想将其用作数据库,请像这样连接到 aioredis aioredis.create_redis(**channels.layers.get_channel_layer().hosts[0])
    • 我明白了。非常感谢。
    • 嗨@pembeci,我的要求和你的完全一样,也有同样的问题。我想知道你是否可以分享你是如何设置的
    • @Nasir,我试图解释我的设置作为答案。如果感到困惑,请提出任何问题。
    【解决方案2】:

    (为了回答 Nasir 的评论和以后的访客,这是我的完整设置)

    Channels 及其工作人员确实是我项目的一个不错的选择,而且我的工作方式也不错。它尚未投入生产,但工作正常,代码结构良好,易于使用等。

    首先,我们需要设置一个工人并让它工作。假设我们的 worker 类是 ExternalData,我们将为 worker 设置一个特定的通道:

    # routing.py
    application = ProtocolTypeRouter({
        # ...
        'channel': ChannelNameRouter({
            "external-data": ExternalData,
        })
    })
    
    # asgi.py  
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync
    # ...
    # add this to the end of the file
    channel_layer = get_channel_layer()
    logger.info("Sending start signal to ExternalData")
    async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })
    
    # external_data.py   worker's code
    
    # used as a singleton object
    class DataStore(object):
    
        @classmethod
        async def create(cls, owner):
            self = DataStore() 
            self.currencies = {}
            self.owner = owner
            # ...
            return self
    
    class ExternalData(AsyncConsumer):
    
        started = False
    
        # triggered from asgi.py via daphne start
        async def external_data_start(self, event):
    
            if ExternalData.started:
                if settings.DEBUG:
                    raise RuntimeError("ExternalData already working.")
                else:
                    logger.warning("ExternalData already working.")
                    return
            else:
                # do your initialization work here and let the data producer start listening and saving external data 
                ExternalData.started = True
                self.store = await DataStore.create(owner=self)
    

    上面代码中的DataStore当然不是必需的,但如果你要做一些复杂的事情,最好将ExternalData用于与通道相关的事情,并在另一个类中做其他事情。使用此设置,您需要先运行 worker:

    python manage.py runworker external-data 
    

    然后启动 daphne(即在另一个终端中查看它们的输出):

    daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application
    

    在生产中,当您需要编写服务或类似的 daphne 时,应稍晚启动(例如睡眠 2-3 秒),以确保工作文件由 python 处理并运行。您也可以反复尝试 asgi.py 代码(即在循环中进行一些睡眠),直到工作人员设置了某个环境标志。

    现在我们的数据提供者已经启动了,但是客户端呢?我们需要有一个消费者,它主要充当我们的数据提供者和客户之间的中介。对于我的项目,数据传输要求涵盖了大多数情况:

    • A:当客户端连接时发送一些初始数据
    • B:客户端可以访问一个页面,需要获取一些与该页面相关的额外数据
    • C: 客户端在页面中,您需要发送实时数据并更新页面
    • D:有新数据到了,需要通知客户端

    我们的应用程序是单页应用程序,这就是我们需要所有这些的原因。这是 sn-p,其中包括我如何处理所有这些情况:

    # consumer.py
    
    class FeedsConsumer(AsyncJsonWebsocketConsumer):
        groups = ["broadcast"]   # for requirement D
    
        # triggered from client
        async def connect(self):
            await self.accept()
            self.listening = set()  # for requirement C
            logger.info(f"New client connected: {self.channel_name}")
            # for requirement A
            await self.channel_layer.send("external-data",
               { "type": "external.new_client", 'client_channel': self.channel_name })
    
        # triggered from client
        async def receive_json(self, data):        
                # for requirement B
                if data["type"] == "get_currency":
                    payload["type"] = "external.send_currency"
                    payload["client_channel"] = self.channel_name
                    payload["currency"] = data["currency"]
                    self.listen(data["currency"])  # for requirement C
                    await self.channel_layer.send("external-data", payload)
    
        # for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
        async def listen(self, item_id):
                if item_id not in self.listening:
                    await self.channel_layer.group_add(item_id, self.channel_name )
                    self.listening.add(item_id)    
    
        # below are triggered from the worker. A and B as responses. C and D as server generated messages 
    
        # for requirement A
        async def init_data(self, payload):
            await self.send_json(payload)
    
        # for requirement B
        async def send_currency(self, payload):
            await self.send_json(payload) 
    
        # for requirement C
        async def new_value(self, payload):
            await self.send_json(payload)  
    
        # for requirement D
        async def new_currency(self, payload):
            await self.send_json(payload) 
    
    # external_data.py   worker's code
    
    class ExternalData(AsyncConsumer):
    
        # for requirement A. triggered from consumer.
        async def external_new_client(self, payload):
            data_to_send = list(self.store.currencies.keys())
            # prepare your data above and then send it like below
            await self.channel_layer.send(payload["client_channel"],  # new client
              { 'type': 'init_data',
                'data': data_to_send,
              })
    
        # for requirement B. triggered from consumer.
        async def external_send_currency(self, payload):
            data_to_send = self.store.currencies[payload["currency"]]
            # prepare your data above and then send it like below
            await self.channel_layer.send(payload["client_channel"],  # only the client who requested data
              { 'type': 'send_currency',
                'data': data_to_send,
              })
    
    
        async def new_data_arrived(self, currency, value):
             if currency not in self.store.currencies:
                 self.store.currencies[currency] = value
                 # requirement D. suppose this is new data so we need to notify all connected users of its availability
                 await self.channel_layer.group_send("broadcast",  # all clients are in this group
                   { 'type': 'new_currency',
                     'data': currency,
                   })
             else:
                 # requirement C, notify listeners.
                 self.store.currencies[currency] = value
                 await self.channel_layer.group_send(currency,  # all clients listening to this currency
                   { 'type': 'new_value',
                     'currency': currency,
                     'value': value,
                   })
    

    希望我没有弄乱代码并且它不是太复杂(我懒得为每个要求粘贴/编辑单独的代码)。有任何问题请在 cmets 中提问。

    【讨论】:

    • 嗨,我是 django 频道的新手。我有一个在后台运行的 celery 任务,它应该在处理结束后将数据发送到 UI。我找不到发送数据的方法。你能告诉我一个解决方案吗?谢谢!
    • 您是如何在渠道中提供数据的?第二个问题是您会将其发送给所有连接的用户还是某些特定用户(即 celery 任务是由用户从 UI 触发的吗?)?
    猜你喜欢
    • 2011-04-22
    • 2020-05-19
    • 2020-04-20
    • 2020-08-03
    • 2022-12-05
    • 2022-11-11
    • 2021-04-15
    • 2020-05-26
    • 1970-01-01
    相关资源
    最近更新 更多