【问题标题】:Exception handling in Celery?芹菜中的异常处理?
【发布时间】:2020-04-25 13:50:50
【问题描述】:

我有一组任务,每个任务都向标准 oauth API 端点发出请求,并且依赖于 bearer_token。如果 bearer_token 过期,任务将在响应处理期间引发异常。还有一个refresh_bearer_token 任务,它处理令牌过期后的更新。

这是一个伪代码:

from proj.celery import app

bearer_token = '1234'

class OauthError(Exception):
    pass

@app.task
def api_request():
    response = request(bearer_token, ...)
    if response.bearer_token_expired:
        raise OauthError('oauth')

@app.task
def refresh_bearer_token():
    ...

如何安排refresh_bearer_token 任务在引发OauthError 时执行?

我能找到的唯一解决方案是像这样使用link_error kwarg:

@app.task
def error_callback(uuid):
    exception_msg = AsyncResult(uuid).get(propagate=False, disable_sync_subtasks=False)
    if exception_msg = 'oauth':
        refresh_bearer_token.delay()
    else:
        raise 

api_request.apply_async(link_error=error_callback.s())

但这似乎不是最理想的,原因有几个,最明显的是因为它在另一个同步子任务中产生了一个同步子任务,在文档中 strongly discourged

在 celery 中是否有更 pythonic 的异常捕获方式?

例如:

def catch(func_that_requires_oauth):
    try:
        func_that_requires_oauth.delay()
    except OauthError:
        refresh_bearer_token.delay() | func_that_requires_oauth.delay()

【问题讨论】:

  • 你有多少个工人,他们会在到期时开始refresh_bearer_token 任务吗?
  • @IainShelvington 为了这个例子,假设我有几个工人。这是我没有考虑的一个好点,因为在我的示例中他们会执行几个 refresh_bearer_token 调用,但不应该。
  • 这有点超出了这个问题的范围,但我还必须考虑给定队列中有多少个客户端(每个客户端 1 个 Bearer_token)。
  • 是否有多个不同的任务都需要这个 oauth 令牌?
  • @IainShelvington 是的。

标签: python django python-3.x celery


【解决方案1】:

只是提出一些想法。如果 refresh_bearer_token 任务在调用时获取了锁,则可以创建一个等待或重试的基本任务。当它失败时,它会启动 refresh_bearer_token 任务并自行退出。

重试会将正在运行的任务的副本放在队列的后面

现在您必须实现某种锁定,如果已获得锁,则 refresh_bearer_token 什么也不做,因为另一个任务应该更新它。当refresh_bearer_token 任务失败时,您还需要为此“锁”添加一个 TTL 以防止出现某些情况

@app.task
def refresh_bearer_token():
    try:
        with aquire_lock(timeout=0):
            refresh_token()
    except TimeoutError:
        pass


class RequiresOauthTask(app.Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        if lock_is_present():
            self.retry()  # or wait?
        return super().__call__(*args, **kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if isinstance(exc, OauthError):
            refresh_bearer_token.delay()
            self.retry()
        super().on_failure(exc, task_id, args, kwargs, einfo)


@app.task(base=RequiresOauthTask)
def my_task():
    pass

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-03-19
  • 2018-12-26
  • 1970-01-01
  • 1970-01-01
  • 2021-09-03
  • 1970-01-01
  • 2014-10-02
相关资源
最近更新 更多