【发布时间】:2020-06-12 05:09:42
【问题描述】:
我正在尝试使用 Django Channels 2.4.0 构建一个简单的聊天应用程序,这是我的消费者代码:
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
if "session_str" in self.scope["session"]:
# get chat history
else:
#create a new chat room
await self.accept()
当我使用AsyncWebsocketConsumer 进行测试时,我不确定如何设置会话字符串。我试过下面的方法
@pytest.mark.asyncio
async def test_ws_connection(self):
""" Test initial websocket connection
"""
start_datetime = datetime.datetime.now()
test_room = Room(create_datetime=start_datetime)
session_str = test_room.session_str
client = Client()
client.session["session_str"] = session_str
application = URLRouter([
url(r'ws/chat/$', ChatConsumer),
])
communicator = WebsocketCommunicator(application, "ws/chat/")
connected, _ = await communicator.connect()
assert connected
await communicator.disconnect()
我在网上遇到以下错误:client.session["session_str"] = session_str:
django.core.exceptions.SynchronousOnlyOperation:您不能从异步上下文中调用它 - 使用线程或 sync_to_async
【问题讨论】:
标签: django testing django-channels