【问题标题】:Proper design pattern for 202 Accepted responses202 Accepted 响应的正确设计模式
【发布时间】:2019-07-28 10:44:00
【问题描述】:

如果第一次没有成功,我有一个 celery shared_task 设置最多重试 10 次。初始日志语句只执行一次。没有任何异常被引发,嵌入的try/else也没有。语句 result = LdapHostGroupView().start(data, username, version) 确实执行了,它从日志条目中显示它成功,但最终的 else 永远不会执行。

这是怎么回事?

@shared_task(bind=True, default_retry_delay=15, max_retry=10)
def host_accepted(self, data, username, version):
    from .api.views import LdapHostGroupView
    name = data.get('name', '')
    version = Decimal(version)
    log.debug("name: %s, version: %s, version type: %s, data: %s",
              name, version, type(version), data)

    try:
        obj = Transaction.objects.get(endpoint_name=name)
    except Transaction.DoesNotExist as e:
        msg = "Could not find transaction '{}'".format(name)
        log.critical(msg)
        syslog.critical(msg)
    else:
        try:
            result = LdapHostGroupView().start(data, username, version)
        except RealmBundleDoesNotExist as e:
            log.debug("Bundle does not exist yet.")
            obj.job_summary += str(e) + '\n'
            obj.job_status = Transaction.INPROGRESS
            obj.save()
            self.retry(exc=e) # ** self.request.retries)
        except (RealmCriticalException, ValidationError) as e:
            error = e.get_full_details()
            log.debug("Host Accepted error: %s", error)

            if isinstance(error, dict):
                for field, values in error.items():
                    for value in values:
                        ed = value.get('message')

                        if isinstance(ed, ErrorDetail):
                            item = str(ed)
                        else:
                            item = value

                        msg = "Field '{}' has error: {}\n".format(field, item)
                        obj.job_summary += msg
            else:
                obj.job_summary += "Had error with no message.\n"

            obj.job_status = Transaction.FAILURE
            obj.save()
        else:
            log.info("Celery task 'host_accepted' executed at %s, "
                     "returned %s, incoming data %s",
                     datetime.now(tzutc()).isoformat(), result, data)

            # Check the result object.

            obj.job_status = Transaction.SUCCESS
            obj.save()

在 Django 视图中是这样调用的:

   host_accepted.delay(request.data, request.user.username, request.version)

【问题讨论】:

    标签: django python-3.x django-rest-framework celery-task


    【解决方案1】:

    因此,在发布我的问题后,我突然想到我上面的代码依赖于我能够重新创建 request 对象或腌制一个对象。这两种方法都不可行。所以我需要做的是只包装在 celery 任务中运行需要一段时间的代码。我发现我可以在序列化程序的 create 方法中返回 celery 任务的结果,而不是普通的 DB 对象。

    我应该提到,这个序列化程序在任何情况下都不会返回 DB 对象,因为它实际上将来自两个外部 API 的数据聚合到我的 API 中。我没有显示该代码。

    我的视图已经过大量定制,但是,它们几乎可以用作普通视图。

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__accepted = False
    
    def post(self, request, *args, **kwargs):
        self.__accepted = False # Use the normal serializer
        self.create(request, *args, **kwargs)
        self.__accepted = True # Use the JobQueue serializer
        return self.create_accepted(request, *args, **kwargs)
    
    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        self.perform_create(serializer)
        # Do not return a Response
    
    def create_accepted(self, request, *args, **kwargs):
        data = {}
        data['endpoint_name'] = request.data.get('name')
        # Add any data needed to create a JobQueue object.
        serializer = self.get_serializer(data=data)
        serializer.is_valid(raise_exception=True)
        self.perform_create(serializer)
        data = serializer.data
        headers = self.get_success_headers(data)
        return Response(data, status=status.HTTP_202_ACCEPTED,
                        headers=headers)
    
    def get_serializer_class(self):
        serializer = None
    
        if self.__accepted:
            if self.request.version == Decimal("1"):
                serializer = JobQueueSerializerVer01
        else:
            if self.request.version == Decimal("1"):
                serializer = SomeSerializerVer01
    
        return serializer
    

    现在序列化器发生了变化:

    class SomeSerializerVer01(serializers.Serializer):
    
        def create(self, validated_data):
            # Call the task
            return wait_for_long_running_code.delay(
                validated_data, self.initial_data)
    
        def create_after_task(self, validated_data, initial_data):
            self.initial_data = initial_data
            self._create_or_update_job_queue(
                name, job_status=JobQueue.INPROGRESS)
            # Do what you need to do here
            data = {}
    
            # Update the JobQueue DB object.
            self._create_or_update_job_queue(
                name, job_status=JobQueue.SUCCESS,
                job_ended=datetime.datetime.now(tzutc()))
    
            return data
    
        def _create_or_update_job_queue(self, name, **kwargs):
            trx = JobQueue.objects.create_transaction(
                name, Endpoint.HOST_GROUP, self.get_user_object(), **kwargs)
            return trx
    

    现在的任务:

    @shared_task(bind=True, default_retry_delay=15, max_retry=8)
    def wait_for_long_runninf_code(self, validated_data, initial_data):
        from your.path import SomeSerializerVer01
        ser = SomeSerializerVer01()
        result = {}
    
        try:
           result = ser.create_after_task(validated_data, initial_data)
        except Exception:
           self.retry(exc=e)
    
        return result
    

    就是这样。我做的一些事情你可能不需要做,比如传递initial_data dict。并非所有内容都显示在上面,例如,JobQueue DB 对象的视图和序列化程序。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-11-07
      • 1970-01-01
      • 2022-12-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-18
      相关资源
      最近更新 更多