【问题标题】:Implementing a custom authentication in DRF which can read request.data在 DRF 中实现可以读取 request.data 的自定义身份验证
【发布时间】:2021-12-06 17:17:02
【问题描述】:

我的模型上有一个外键,比如 Patient 和 Doctor,它们指向 Clinic 类。所以,病人和医生应该单独属于这个诊所。其他诊所应该无法看到这些模型的任何细节。

模型如下所示:

class Clinic(models.Model):
    clinicid = models.AutoField(primary_key=True, unique=True)
    name = models.CharField(max_length=60, unique=True)
    label = models.SlugField(max_length=25, unique=True)
    email = models.EmailField(max_length=100, default='')
    mobile = models.CharField(max_length=15, default='')
    ...

class Doctor(models.Model):
    # Need autoincrement, unique and primary
    docid = models.AutoField(primary_key=True, unique=True)
    name = models.CharField(max_length=200)
    username = models.CharField(max_length=15)
    regid = models.CharField(max_length=15, default="", blank=True)
    ...
    linkedclinic = models.ForeignKey(Clinic, on_delete=models.CASCADE)

class Patient(models.Model):
    cstid = models.AutoField(primary_key=True, unique=True)
    date_of_registration = models.DateField(default=timezone.now)
    name = models.CharField(max_length=35, blank=False)
    ageyrs = models.IntegerField(blank=True)
    agemnths = models.IntegerField(blank=True)
    dob = models.DateField(null=True, blank=True)
    ...
    linkedclinic = models.ForeignKey(Clinic, on_delete=models.CASCADE)

class UserGroupMap(models.Model):
    id = models.AutoField(primary_key=True, unique=True)
    user = models.ForeignKey(
        User, related_name='target_user', on_delete=models.CASCADE)
    group = models.ForeignKey(UserGroup, on_delete=models.CASCADE)
    clinic = models.ForeignKey(Clinic, on_delete=models.CASCADE)
    ...

从我的 Vue 应用程序中,我将使用 Axios 发布到使用 DRF 的 django 应用程序,从而获得患者和医生的序列化数据。如果我尝试在函数视图中使用以下示例代码,一切正常:

@api_view(['GET', 'POST'])
def register_patient_vue(request):
    if request.method == 'POST':
        print("POST details", request.data)
        data = request.data['registration_data']
        serializer = customerSpecialSerializer(data=data)
        if serializer.is_valid():
            a = serializer.save()
            print(serializer.data)
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        else:
            print("Serializer is notNot valid.")
            print(serializer.errors)
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

示例输出:

POST details {'registration_data': {'name': 'wczz', 'ageyrs': 21, 'agemonths': '', 'dob': '', 'gender': 'unspecified', 'mobile': '2', 'email': '', 'alternate': '', 'address': '', 'marital': 'unspecified', 'city': '', 'occupation': '', 'linkedclinic': 10}}
data: {'name': 'wczz', 'ageyrs': 21, 'agemonths': '', 'dob': '', 'gender': 'unspecified', 'mobile': '2', 'email': '', 'alternate': '', 'address': '', 'marital': 'unspecified', 'city': '', 'occupation': '', 'linkedclinic': 10}

但是,我需要通过特殊的自定义身份验证来验证请求。我有另一个名为 UserGroupMap 的类,它具有用户和诊所的外键,因此如果诊所和用户的过滤器匹配,则在地图中,它将进行身份验证。否则,它应该无法通过身份验证,并且不应检索数据或保存序列化程序。

在我之前的简单纯 django 项目中,我曾经使用自定义权限函数,并用它装饰我的视图:

@handle_perm(has_permission_level, required_permission='EDIT_CLINICAL_RECORD', login_url='/clinic/')
def some_function(request, dept_id):
    ....
    Some code which runs after authentication

它会使用以下内容:

def handle_perm(test_func, required_permission=None, login_url=None, redirect_field_name=REDIRECT_FIELD_NAME):
    """
    Decorator for views that checks that the user passes the given test,
    redirecting to the log-in page if necessary. The test should be a callable
    that takes the user object and returns True if the user passes.
    """

    def decorator(view_func):
        @wraps(view_func)
        def _wrapped_view(request, *args, **kwargs):
            print(f"Required permission level is {required_permission}")
            if has_permission_level(request, required_permission):
                print("User has required permission level..Allowing entry.")
                return view_func(request, *args, **kwargs)
            print("FAILED! User does not have required permission level. Access blocked.")
            path = request.build_absolute_uri()
            resolved_login_url = resolve_url(login_url or settings.LOGIN_URL)
            # If the login url is the same scheme and net location then just
            # use the path as the "next" url.
            login_scheme, login_netloc = urlparse(resolved_login_url)[:2]
            current_scheme, current_netloc = urlparse(path)[:2]
            if ((not login_scheme or login_scheme == current_scheme) and
                    (not login_netloc or login_netloc == current_netloc)):
                path = request.get_full_path()
            from django.contrib.auth.views import redirect_to_login
            return redirect_to_login(
                path, resolved_login_url, redirect_field_name)
        return _wrapped_view
    return decorator

def has_permission_level(request, required_permission, clinic=None):
    print("has_permission_level was called.")
    user = request.user
    print(f'user is {user}')
    
    clinic=clinic_from_request(request)
    print(f"has_permission_level called with clinic:{clinic}")
    
    if clinic is None:
        print("clinic is none")
        return HttpResponseRedirect('/accounts/login/')

    group_maps = UserGroupMap.objects.filter(user=user, clinic=clinic)
    print(f"No: of UserGroupMap memberships: {len(group_maps)}")
    if len(group_maps) < 1:
        # There are no UserGroupMap setup for the user. Kindly set them up.\nHint:Admin>Manage users and groups>Users
        return False
    # Now checking Group memberships whether the user has any with permisison

    for map in group_maps:
        rolesmapped = GroupRoleMap.objects.filter(group=map.group)
        if len(rolesmapped) < 1:
            print(f"No permission roles.")
        else:
            for rolemap in rolesmapped:
                print(f"{rolemap.role}", end=",")
                if rolemap.role.name == required_permission:
                    print(
                        f"\nAvailable role of [{map.group}] matched required permission of [{required_permission}] in {clinic.name} [Ok]")
                    return True
    return False

我需要使用 DRF 构建自定义身份验证,以便它读取 POST 数据,检查linkedclinic 值,并采用类似的逻辑。

我是这样开始的:

def has_permission_POST(request, required_permission, clinic=None):
    print("has_permission_POST was called.")
    user = request.user
    print(f'user is {user}')

    if request.method == 'POST':
        print(request)
        print(dir(request))
        print("POST details: POST:", request.POST, "\n")
        print("POST details: data:", request.data, "\n")
        ....
        # Further logic to check the mapping

        return True
        
    else:
        print("Not a valid POST")
        return Response("INVALID POST", status=status.HTTP_400_BAD_REQUEST)

# And decorating my DRF view:
@handle_perm(has_permission_POST, required_permission='EDIT_CLINICAL_RECORD', login_url='/clinic/')
@api_view(['GET', 'POST'])
def register_patient_vue(request):
    if request.method == 'POST':
        print("POST details", request.data)
        data = request.data['registration_data']

问题是,如果我运行它,那么 has_permission_POST 无法获取 request.data 的值,其中包含从我的前端发布的数据。我可以通过将 @api_view(['GET', 'POST']) 装饰器添加到 has_permission_POST 来解决这个问题。但这引入了另一个错误,一个失败的断言:

AssertionError: Expected a `Response`, `HttpResponse` or `HttpStreamingResponse` to be returned from the view, but received a `<class 'bool'>`

这发生在 has_permission_POST 一旦用@api_view 修饰。

所以我的问题:

  1. 如何为我的用例实现自定义身份验证?
  2. 如果我打算这样做,通过使用这个自定义 has_permission_level,我如何在调用我的实际 api 视图之前在这个函数中获取 request.data,以便我可以读取诊所 ID 并检查权限我需要的。

我查看了DRF提供的CustomAuthentication,但没有找到自定义类中request.data参数的获取方式。

【问题讨论】:

  • 你看过this question吗?他们的建议似乎与custom permissions 文档中的部分一致,这可能是您正在寻找的内容。他们在那里覆盖BasePermission 类,而不是像您在那里所做的那样创建一个函数。然后看起来他们可以访问request 参数,尽管我没有看到您在问题中所述的专门针对request.data 的示例。

标签: django django-rest-framework


【解决方案1】:

感谢@MihaiChelaru,我能够找到解决问题的方法。

我通过扩展 permissions.BasePermission 并在特殊的 has_permission 函数中使用我的自定义逻辑创建了一个自定义 Permission 类。我更进一步,从请求中实现了对 Token 的检查。一旦令牌通过身份验证,用户就可以从令牌表中的匹配令牌中获取。我发现在自定义权限类中,我可以阅读 Vue 和 Postman 传递的完整的request.data 参数。阅读后,我可以轻松地实现自定义模型对用户权限的自定义检查。

class CustomerAccessPermission(permissions.BasePermission):
    message = 'No permission to create new patient records'

    def has_permission(self, request, view):
        bearer_authorizn = request.META.get('HTTP_AUTHORIZATION')
        try: #Different apps like POSTMAN, and Vue seem to use different strings while passing token
            token = bearer_authorizn.split("Bearer ")[1]
        except Exception as e:
            try:
                token = bearer_authorizn.split("Token ")[1]
            except Exception as e:
                raise NotAuthenticated('Did not get token in request')
        try:
            token_obj = Token.objects.get(key=token)
        except self.model.DoesNotExist:
            raise AuthenticationFailed('Invalid token')
        if not token_obj.user.is_active:
            raise AuthenticationFailed('User inactive or deleted')
        print("Username is %s" % token_obj.user.username)
        print("POST details", request.data)
        linkedclinic_id = request.data['data']['linkedclinic']
        clinic = Clinic.objects.get(clinicid=int(linkedclinic_id))
        print("Clinic membership requested:", clinic)
        group_maps = UserGroupMap.objects.filter(user=user, clinic=clinic)
        print(f"No: of UserGroupMap memberships: {len(group_maps)}")
        if len(group_maps) > 1:
            return True
        return False


@api_view(['POST'])
@permission_classes([CustomerAccessPermission])
def register_patient_vue(request):
    logger.info('In register_patient_vue...')
    ...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-07-24
    • 1970-01-01
    • 2018-04-09
    • 2011-04-24
    • 1970-01-01
    • 1970-01-01
    • 2018-11-27
    相关资源
    最近更新 更多