【问题标题】:Django: Post-Save Signal TransactionManagementErrorDjango:保存后信号 TransactionManagementError
【发布时间】:2019-06-13 05:26:42
【问题描述】:

我正在使用post-save signal 作为模型来启动celery 任务。它以前可以工作,但突然间,它给了我一个 TransactionManagementError

型号

class FieldSightXF(models.Model):
    xf = models.ForeignKey(XForm, related_name="field_sight_form")
    site = models.ForeignKey(Site, related_name="site_forms", null=True, blank=True)
    project = models.ForeignKey(Project, related_name="project_forms", null=True, blank=True)
    is_staged = models.BooleanField(default=False)
    is_scheduled = models.BooleanField(default=False)
    date_created = models.DateTimeField(auto_now=True)
    date_modified = models.DateTimeField(auto_now=True)
    schedule = models.OneToOneField(Schedule, blank=True, null=True, related_name="schedule_forms")
    stage = models.OneToOneField(Stage, blank=True, null=True, related_name="stage_forms")
    shared_level = models.IntegerField(default=2, choices=SHARED_LEVEL)
    form_status = models.IntegerField(default=0, choices=FORM_STATUS)
    fsform = models.ForeignKey('self', blank=True, null=True, related_name="parent")
    is_deployed = models.BooleanField(default=False)
    is_deleted = models.BooleanField(default=False)
    is_survey = models.BooleanField(default=False)
    from_project = models.BooleanField(default=True)
    default_submission_status = models.IntegerField(default=0, choices=FORM_STATUS)
    logs = GenericRelation('eventlog.FieldSightLog')

    class Meta:
        db_table = 'fieldsight_forms_data'
        # unique_together = (("xf", "site"), ("xf", "is_staged", "stage"),("xf", "is_scheduled", "schedule"))
        verbose_name = _("XForm")
        verbose_name_plural = _("XForms")
        ordering = ("-date_created",)

保存后信号

@receiver(post_save, sender=FieldSightXF)
def share_form(sender, instance, created,  **kwargs):
    if instance.project is not None and created:
        from onadata.apps.fsforms.tasks import share_form_managers
        task_obj = CeleryTaskProgress.objects.create(user=instance.xf.user,
                                                     description="Share Forms",
                                                     task_type=17, content_object=instance)
        if task_obj:
            try:
                share_form_managers.delay(instance.id, task_obj.id)
            except:
                pass

post_save.connect(create_messages, sender=FieldSightXF)

CeleryTaskProgress用于跟踪 celery 任务的进度

任务

@shared_task(max_retires=5)
def share_form_managers(fxf, task_id):
    fxf = FieldSightXF.objects.get(pk=fxf)
    userrole = UserRole.objects.filter(project=fxf.project, group__name='Project Manager')
    users = User.objects.filter(user_roles__in=userrole)
    shared = share_form(users, fxf.xf)
    if shared:
        CeleryTaskProgress.objects.filter(id=task_id).update(status=2)
    else:
        CeleryTaskProgress.objects.filter(id=task_id).update(status=3)

share_form 方法

def share_form(users, xform):
    from onadata.apps.fsforms.models import ObjectPermission, Asset
    for user in users:
        try:
            codenames = ['view_asset', 'change_asset']
            permissions = Permission.objects.filter(content_type__app_label='kpi', codename__in=codenames)
            for perm in permissions:
                object_id = Asset.objects.get(uid=xform.id_string).id
                content_type = ContentType.objects.get(id=21)

                # Create the new permission
                new_permission = ObjectPermission.objects.create(
                    object_id=object_id,
                    content_type=content_type,
                    user=user,
                    permission_id=perm.pk,
                    deny=False,
                    inherited=False
                )

        except:
            return False
        else:
            return True

这个过程的作用是每当创建FieldSightXF 的对象时(将表单分配给项目),然后将表单共享给该项目的项目经理。

以前没有问题,因为我将FieldSightXF 对象作为参数传递给任务,但现在我将对象ID 传递为:

以前

share_form_managers.delay(instance, task_obj.id)

当前

share_form_managers.delay(instance.id, task_obj.id) 

现在,这两种情况都给了我提到的错误。 如果我从 post-save 信号方法中注释上述行,错误就消失了。

我确实尝试了针对类似问题的其他答案的建议,但它们对我不起作用。

完整的错误回溯:

ERROR 2019-06-13 10:49:50,743 base 11527 140653367232256 Internal Server Error: /forms/api/fxf/
Traceback (most recent call last):
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/core/handlers/base.py", line 132, in get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/viewsets.py", line 87, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 466, in dispatch
    response = self.handle_exception(exc)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 463, in dispatch
    response = handler(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/mixins.py", line 21, in create
    self.perform_create(serializer)
  File "/home/sanip/naxa/source/fieldsight/onadata/apps/fsforms/viewsets/FieldSightXformViewset.py", line 85, in perform_create
    fxf.save()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 734, in save
    force_update=force_update, update_fields=update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 762, in save_base
    updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 827, in _save_table
    forced_update)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 877, in _do_update
    return filtered._update(values) > 0
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/query.py", line 580, in _update
    return query.get_compiler(self.db).execute_sql(CURSOR)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 1062, in execute_sql
    cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
    cursor.execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
    return super(CursorDebugWrapper, self).execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 59, in execute
    self.db.validate_no_broken_transaction()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/base/base.py", line 327, in validate_no_broken_transaction
    "An error occurred in the current transaction. You can't "
TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.
Internal Server Error: /forms/api/fxf/
Traceback (most recent call last):
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/core/handlers/base.py", line 132, in get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/viewsets.py", line 87, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 466, in dispatch
    response = self.handle_exception(exc)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 463, in dispatch
    response = handler(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/mixins.py", line 21, in create
    self.perform_create(serializer)
  File "/home/sanip/naxa/source/fieldsight/onadata/apps/fsforms/viewsets/FieldSightXformViewset.py", line 85, in perform_create
    fxf.save()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 734, in save
    force_update=force_update, update_fields=update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 762, in save_base
    updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 827, in _save_table
    forced_update)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 877, in _do_update
    return filtered._update(values) > 0
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/query.py", line 580, in _update
    return query.get_compiler(self.db).execute_sql(CURSOR)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 1062, in execute_sql
    cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
    cursor.execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
    return super(CursorDebugWrapper, self).execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 59, in execute
    self.db.validate_no_broken_transaction()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/base/base.py", line 327, in validate_no_broken_transaction
    "An error occurred in the current transaction. You can't "
TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.

从回溯来看,错误在以下代码中指出:

def perform_create(self, serializer):
    is_survey = self.request.data.get('is_survey', False)
    fxf = serializer.save(is_survey=is_survey, is_deployed=True)
    if not fxf.project:
        fxf.from_project = False
    fxf.save() #<<--here
    if fxf.project:
        if not fxf.is_survey:    
            org = fxf.project.organization
            fxf.logs.create(source=self.request.user, type=18, title="General",
                      organization=org,
                      project = fxf.project,
                      content_object=fxf,
                      extra_object=fxf.project,
                      description='{0} assigned new General form  {1} to {2} '.format(
                          self.request.user.get_full_name(),
                          fxf.xf.title,
                          fxf.project.name))
    else:
        org = fxf.site.project.organization

        fxf.logs.create(source=self.request.user, type=19, title="General",
                                          organization=org,
                                          project=fxf.site.project,
                                          site = fxf.site,
                                          content_object=fxf,
                                          extra_object=fxf.site,
                                          description='{0} assigned new General form  {1} to {2} '.format(
                                              self.request.user.get_full_name(),
                                              fxf.xf.title,
                                              fxf.site.name
                                          ))

perform_create 是视图集中的一个方法,用于将表单分配给项目。

【问题讨论】:

    标签: python django django-celery django-signals


    【解决方案1】:

    从错误看来,您在数据库级别或视图级别启用了原子事务。

    阅读您的代码后,问题在我看来是这样的。

    • 您从 perform_create 视图中创建了一个新的 FieldSightXF 对象,但是由于启用了原子事务,该对象实际上还没有存储在数据库中。将执行代码并将响应返回给用户。

    • 现在当您调用 fxf.save() 时,会处理 post_save_signal 并调用函数 share_form

      @receiver(post_save, sender=FieldSightXF)
      def share_form(sender, instance, created,  **kwargs):
          if instance.project is not None and created:
              from onadata.apps.fsforms.tasks import share_form_managers
              task_obj = CeleryTaskProgress.objects.create(user=instance.xf.user,
                                                   description="Share Forms",
                                                   task_type=17, content_object=instance)
              if task_obj:
                  try:
                      share_form_managers.delay(instance.id, task_obj.id)
                  except:
                      pass
      
      post_save.connect(create_messages, sender=FieldSightXF)
      
    • 在 share_form 函数中您使用 FieldSightXF 对象的 id 调用了任务 share_form_managers

    • 现在,当 celery 执行您的任务并这样做时访问数据库以查找 id=fxf 的 FieldSightXF 对象和 id=task_id 的 CeleryTaskProgress 对象,它找不到它,因为它还没有在数据库中并且引发数据库错误

      @shared_task(max_retires=5)
      def share_form_managers(fxf, task_id):
          fxf = FieldSightXF.objects.get(pk=fxf) #<<--here
          userrole = UserRole.objects.filter(project=fxf.project, group__name='Project Manager')
          users = User.objects.filter(user_roles__in=userrole)
          shared = share_form(users, fxf.xf)
          if shared:
              CeleryTaskProgress.objects.filter(id=task_id).update(status=2)
          else:
              CeleryTaskProgress.objects.filter(id=task_id).update(status=3)
      
    • 如果在事务块中发生数据库错误django 会阻止任何进一步数据库查询以停止损坏的数据读/写

    • 在 perform_create 函数中,你试图在 fxf.save() 之后读/写数据,所以 django 引发了事务错误

      def perform_create(self, serializer):
          is_survey = self.request.data.get('is_survey', False)
          fxf = serializer.save(is_survey=is_survey, is_deployed=True)
          if not fxf.project:
              fxf.from_project = False
          fxf.save() #<<--here
          if fxf.project:
              if not fxf.is_survey:    
                  org = fxf.project.organization
                  fxf.logs.create(source=self.request.user, type=18, title="General",
                    organization=org,
                    project = fxf.project,
                    content_object=fxf,
                    extra_object=fxf.project,
                    description='{0} assigned new General form  {1} to {2} '.format(
                        self.request.user.get_full_name(),
                        fxf.xf.title,
                        fxf.project.name))
          else:
              org = fxf.site.project.organization
      
              fxf.logs.create(source=self.request.user, type=19, title="General",
                                        organization=org,
                                        project=fxf.site.project,
                                        site = fxf.site,
                                        content_object=fxf,
                                        extra_object=fxf.site,
                                        description='{0} assigned new General form  {1} to {2} '.format(
                                            self.request.user.get_full_name(),
                                            fxf.xf.title,
                                            fxf.site.name
                                        ))
      

    我建议不要使用 post_save 信号,而是从 perform_create 视图调用 share_form_managers 任务,如下所示:

        from django.db import transaction
    
        def perform_create(self, serializer):
            is_survey = self.request.data.get('is_survey', False)
            fxf = serializer.save(is_survey=is_survey, is_deployed=True)
            if not fxf.project:
                fxf.from_project = False
            fxf.save()
            if fxf.project:
                # calling the task
                from onadata.apps.fsforms.tasks import share_form_managers
                task_obj = CeleryTaskProgress.objects.create(user=fxf.xf.user, description="Share Forms", task_type=17, content_object=instance)
                if task_obj:
                    try:
                        transaction.on_commit(lambda: share_form_managers.delay(instance.id, task_obj.id))
                    except:
                        pass
                # call block end
                if not fxf.is_survey:    
                    org = fxf.project.organization
                    fxf.logs.create(source=self.request.user, type=18, title="General", organization=org, project = fxf.project, content_object=fxf,extra_object=fxf.project, description='{0} assigned new General form  {1} to {2} '.format(self.request.user.get_full_name(),fxf.xf.title,fxf.project.name))
            else:
                org = fxf.site.project.organization
    
                fxf.logs.create(source=self.request.user, type=19, title="General", organization=org, project=fxf.site.project, site = fxf.site, content_object=fxf, extra_object=fxf.site, description='{0} assigned new General form  {1} to {2} '.format(self.request.user.get_full_name(),fxf.xf.title,fxf.site.name ))
    
    • transaction.on_commit 将确保只有在 perform_create 视图中的数据存储到数据库后才会调用任务。

    如果有帮助,请告诉我。

    这里参考django的事务管理: https://docs.djangoproject.com/en/2.2/topics/db/transactions/

    【讨论】:

    • 感谢您对问题的正确解释。试过你的建议,现在可以了。但是我不允许直接编辑perform_create的代码。所以我采纳了你的想法,并在 atomic transaction block 中调用了 post_save 信号的 share_form_managers 任务,现在它工作正常。非常感谢您提供这么好的答案。
    • 我很高兴它有帮助!
    猜你喜欢
    • 2013-12-06
    • 1970-01-01
    • 1970-01-01
    • 2011-03-15
    • 2011-08-01
    • 2012-04-14
    • 1970-01-01
    • 1970-01-01
    • 2010-09-15
    相关资源
    最近更新 更多