(为了回答 Nasir 的评论和以后的访客,这是我的完整设置)
Channels 及其工作人员确实是我项目的一个不错的选择,而且我的工作方式也不错。它尚未投入生产,但工作正常,代码结构良好,易于使用等。
首先,我们需要设置一个工人并让它工作。假设我们的 worker 类是 ExternalData,我们将为 worker 设置一个特定的通道:
# routing.py
application = ProtocolTypeRouter({
# ...
'channel': ChannelNameRouter({
"external-data": ExternalData,
})
})
# asgi.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# ...
# add this to the end of the file
channel_layer = get_channel_layer()
logger.info("Sending start signal to ExternalData")
async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })
# external_data.py worker's code
# used as a singleton object
class DataStore(object):
@classmethod
async def create(cls, owner):
self = DataStore()
self.currencies = {}
self.owner = owner
# ...
return self
class ExternalData(AsyncConsumer):
started = False
# triggered from asgi.py via daphne start
async def external_data_start(self, event):
if ExternalData.started:
if settings.DEBUG:
raise RuntimeError("ExternalData already working.")
else:
logger.warning("ExternalData already working.")
return
else:
# do your initialization work here and let the data producer start listening and saving external data
ExternalData.started = True
self.store = await DataStore.create(owner=self)
上面代码中的DataStore当然不是必需的,但如果你要做一些复杂的事情,最好将ExternalData用于与通道相关的事情,并在另一个类中做其他事情。使用此设置,您需要先运行 worker:
python manage.py runworker external-data
然后启动 daphne(即在另一个终端中查看它们的输出):
daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application
在生产中,当您需要编写服务或类似的 daphne 时,应稍晚启动(例如睡眠 2-3 秒),以确保工作文件由 python 处理并运行。您也可以反复尝试 asgi.py 代码(即在循环中进行一些睡眠),直到工作人员设置了某个环境标志。
现在我们的数据提供者已经启动了,但是客户端呢?我们需要有一个消费者,它主要充当我们的数据提供者和客户之间的中介。对于我的项目,数据传输要求涵盖了大多数情况:
- A:当客户端连接时发送一些初始数据
- B:客户端可以访问一个页面,需要获取一些与该页面相关的额外数据
- C: 客户端在页面中,您需要发送实时数据并更新页面
- D:有新数据到了,需要通知客户端
我们的应用程序是单页应用程序,这就是我们需要所有这些的原因。这是 sn-p,其中包括我如何处理所有这些情况:
# consumer.py
class FeedsConsumer(AsyncJsonWebsocketConsumer):
groups = ["broadcast"] # for requirement D
# triggered from client
async def connect(self):
await self.accept()
self.listening = set() # for requirement C
logger.info(f"New client connected: {self.channel_name}")
# for requirement A
await self.channel_layer.send("external-data",
{ "type": "external.new_client", 'client_channel': self.channel_name })
# triggered from client
async def receive_json(self, data):
# for requirement B
if data["type"] == "get_currency":
payload["type"] = "external.send_currency"
payload["client_channel"] = self.channel_name
payload["currency"] = data["currency"]
self.listen(data["currency"]) # for requirement C
await self.channel_layer.send("external-data", payload)
# for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
async def listen(self, item_id):
if item_id not in self.listening:
await self.channel_layer.group_add(item_id, self.channel_name )
self.listening.add(item_id)
# below are triggered from the worker. A and B as responses. C and D as server generated messages
# for requirement A
async def init_data(self, payload):
await self.send_json(payload)
# for requirement B
async def send_currency(self, payload):
await self.send_json(payload)
# for requirement C
async def new_value(self, payload):
await self.send_json(payload)
# for requirement D
async def new_currency(self, payload):
await self.send_json(payload)
# external_data.py worker's code
class ExternalData(AsyncConsumer):
# for requirement A. triggered from consumer.
async def external_new_client(self, payload):
data_to_send = list(self.store.currencies.keys())
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # new client
{ 'type': 'init_data',
'data': data_to_send,
})
# for requirement B. triggered from consumer.
async def external_send_currency(self, payload):
data_to_send = self.store.currencies[payload["currency"]]
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # only the client who requested data
{ 'type': 'send_currency',
'data': data_to_send,
})
async def new_data_arrived(self, currency, value):
if currency not in self.store.currencies:
self.store.currencies[currency] = value
# requirement D. suppose this is new data so we need to notify all connected users of its availability
await self.channel_layer.group_send("broadcast", # all clients are in this group
{ 'type': 'new_currency',
'data': currency,
})
else:
# requirement C, notify listeners.
self.store.currencies[currency] = value
await self.channel_layer.group_send(currency, # all clients listening to this currency
{ 'type': 'new_value',
'currency': currency,
'value': value,
})
希望我没有弄乱代码并且它不是太复杂(我懒得为每个要求粘贴/编辑单独的代码)。有任何问题请在 cmets 中提问。