【问题标题】:Django Channels Redis: Exception inside application: Lock is not acquiredDjango Channels Redis:应用程序内部异常:未获取锁
【发布时间】:2021-08-05 15:35:51
【问题描述】:

完全加载的多租户 Django 应用程序具有 1000 个使用 Daphne/Channels 的 WebSockets,运行良好几个月,突然租户都将其称为支持线,应用程序运行缓慢或完全挂起。由于 HTTP REST API 命中快速且无错误,因此将其缩小到 WebSockets。

没有任何应用程序日志或操作系统日志表明存在问题,因此唯一需要处理的是下面提到的异常。它在 2 天内一次又一次地发生。

我不期望任何深入的调试帮助,只是一些关于可能性的即兴建议。

AWS Linux 1
Python 3.6.4
Elasticache Redis 5.0
channels==2.4.0
channels-redis==2.4.2
daphne==2.5.0
Django==2.2.13

拆分配置HTTP服务于uwsgi,daphne服务于asgi,Nginx

May 10 08:08:16 prod-b-web1: [pid 15053] [version 119.5.10.5086] [tenant_id -] [domain_name -] [pathname /opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/daphne/server.py] [lineno 288] [priority ERROR] [funcname application_checker] [request_path -] [request_method -] [request_data -] [request_user -] [request_stack -] Exception inside application: Lock is not acquired.
Traceback (most recent call last):
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py", line 435, in receive
    real_channel
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py", line 484, in receive_single
    await self.receive_clean_locks.acquire(channel_key)
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py", line 152, in acquire
    return await self.locks[channel].acquire()
  File "/opt/python3.6/lib/python3.6/asyncio/locks.py", line 176, in acquire
    yield from fut
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/consumer.py", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/utils.py", line 58, in await_many_dispatch
    await task
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py", line 447, in receive
    self.receive_lock.release()
  File "/opt/python3.6/lib/python3.6/asyncio/locks.py", line 201, in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

【问题讨论】:

  • 你是如何管理 python 依赖的?
  • 当您通过按键事件中断时,您是否收到上述错误?因为上面的堆栈跟踪指向self.stop(),这仅在应用程序的清理活动中完成,并且存在KeyboardInterrupt 类型的异常。您多久看到一次上述 stakctrace 打印?
  • 流量模式还是一样吗?你在文件锁方面做得如何? redis 是否存在,是否有任何与 redis 的连接被卡在关闭或类似情况下?您可以使用ssnetstat 来检查这些。
  • 这看起来像是一个多线程问题。

标签: python django redis django-channels django-redis


【解决方案1】:

首先,让我们看看RuntimeError: Lock is not acquired. 错误的来源。正如回溯所给出的,文件/opt/python3.6/lib/python3.6/asyncio/locks.py 中的release() 方法定义如下:

    def release(self):
        """Release a lock.

        When the lock is locked, reset it to unlocked, and return.
        If any other coroutines are blocked waiting for the lock to become
        unlocked, allow exactly one of them to proceed.

        When invoked on an unlocked lock, a RuntimeError is raised.

        There is no return value.
        """
        if self._locked:
            self._locked = False
            self._wake_up_first()
        else:
            raise RuntimeError('Lock is not acquired.')

原语锁是一种同步原语,在锁定时不属于特定线程。

当试图通过调用release() 方法来释放未锁定的锁时,RuntimeError 将被引发,因为该方法只能在锁定状态下调用。在锁定状态下调用时状态变为未锁定。

现在对于同一文件中acquire() 方法中引发的先前错误,acquire() 方法定义如下:

    async def acquire(self):
        """Acquire a lock.

        This method blocks until the lock is unlocked, then sets it to
        locked and returns True.
        """
        if (not self._locked and (self._waiters is None or
                all(w.cancelled() for w in self._waiters))):
            self._locked = True
            return True

        if self._waiters is None:
            self._waiters = collections.deque()
        fut = self._loop.create_future()
        self._waiters.append(fut)

        # Finally block should be called before the CancelledError
        # handling as we don't want CancelledError to call
        # _wake_up_first() and attempt to wake up itself.
        try:
            try:
                await fut
            finally:
                self._waiters.remove(fut)
        except exceptions.CancelledError:
            if not self._locked:
                self._wake_up_first()
            raise

        self._locked = True
        return True

因此,为了让您出现 concurrent.futures._base.CancelledError 错误,await fut 一定是导致问题的原因。

要修复它,您可以查看Awaiting an asyncio.Future raises concurrent.futures._base.CancelledError instead of waiting for a value/exception to be set

基本上,您的代码中可能有一个您没有等待的可等待对象,并且通过不等待它,您永远不会将控制权交还给事件循环或存储可等待对象,导致它立即被清理,完全取消它(以及它控制的所有等待对象)

只需确保您等待代码中可等待对象的结果,找到您遗漏的任何内容。

【讨论】:

    猜你喜欢
    • 2021-02-09
    • 2020-04-23
    • 1970-01-01
    • 2013-11-08
    • 2021-10-23
    • 2018-10-12
    • 2016-01-11
    • 2012-03-02
    • 1970-01-01
    相关资源
    最近更新 更多