【发布时间】:2021-02-20 04:57:27
【问题描述】:
问题
我目前有一个名为 jwt 的 JWT 依赖项,它确保它在访问端点之前通过 JWT 身份验证阶段,如下所示:
sample_endpoint.py:
from fastapi import APIRouter, Depends, Request
from JWTBearer import JWTBearer
from jwt import jwks
router = APIRouter()
jwt = JWTBearer(jwks)
@router.get("/test_jwt", dependencies=[Depends(jwt)])
async def test_endpoint(request: Request):
return True
以下是使用 JWT 对用户进行身份验证的 JWT 依赖项(来源:https://medium.com/datadriveninvestor/jwt-authentication-with-fastapi-and-aws-cognito-1333f7f2729e):
JWTBearer.py
from typing import Dict, Optional, List
from fastapi import HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, jwk, JWTError
from jose.utils import base64url_decode
from pydantic import BaseModel
from starlette.requests import Request
from starlette.status import HTTP_403_FORBIDDEN
JWK = Dict[str, str]
class JWKS(BaseModel):
keys: List[JWK]
class JWTAuthorizationCredentials(BaseModel):
jwt_token: str
header: Dict[str, str]
claims: Dict[str, str]
signature: str
message: str
class JWTBearer(HTTPBearer):
def __init__(self, jwks: JWKS, auto_error: bool = True):
super().__init__(auto_error=auto_error)
self.kid_to_jwk = {jwk["kid"]: jwk for jwk in jwks.keys}
def verify_jwk_token(self, jwt_credentials: JWTAuthorizationCredentials) -> bool:
try:
public_key = self.kid_to_jwk[jwt_credentials.header["kid"]]
except KeyError:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="JWK public key not found"
)
key = jwk.construct(public_key)
decoded_signature = base64url_decode(jwt_credentials.signature.encode())
return key.verify(jwt_credentials.message.encode(), decoded_signature)
async def __call__(self, request: Request) -> Optional[JWTAuthorizationCredentials]:
credentials: HTTPAuthorizationCredentials = await super().__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Wrong authentication method"
)
jwt_token = credentials.credentials
message, signature = jwt_token.rsplit(".", 1)
try:
jwt_credentials = JWTAuthorizationCredentials(
jwt_token=jwt_token,
header=jwt.get_unverified_header(jwt_token),
claims=jwt.get_unverified_claims(jwt_token),
signature=signature,
message=message,
)
except JWTError:
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")
if not self.verify_jwk_token(jwt_credentials):
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")
return jwt_credentials
jwt.py:
import os
import requests
from dotenv import load_dotenv
from fastapi import Depends, HTTPException
from starlette.status import HTTP_403_FORBIDDEN
from app.JWTBearer import JWKS, JWTBearer, JWTAuthorizationCredentials
load_dotenv() # Automatically load environment variables from a '.env' file.
jwks = JWKS.parse_obj(
requests.get(
f"https://cognito-idp.{os.environ.get('COGNITO_REGION')}.amazonaws.com/"
f"{os.environ.get('COGNITO_POOL_ID')}/.well-known/jwks.json"
).json()
)
jwt = JWTBearer(jwks)
async def get_current_user(
credentials: JWTAuthorizationCredentials = Depends(auth)
) -> str:
try:
return credentials.claims["username"]
except KeyError:
HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Username missing")
api_key_dependency.py(现在很简化,会改的):
from fastapi import Security, FastAPI, HTTPException
from fastapi.security.api_key import APIKeyHeader
from starlette.status import HTTP_403_FORBIDDEN
async def get_api_key(
api_key_header: str = Security(api_key_header)
):
API_KEY = ... getting API KEY logic ...
if api_key_header == API_KEY:
return True
else:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
问题
根据情况,我想先检查它是否在标头中有 API Key,如果存在,则使用它进行身份验证。否则,我想使用 jwt 依赖项进行身份验证。我想确保如果 api-key 身份验证或 jwt 身份验证通过,则用户已通过身份验证。这在 FastAPI 中是否可行(即具有多个依赖项,如果其中一个通过,则通过身份验证)。谢谢!
【问题讨论】:
-
一个简单的解决方案可能是拥有一个唯一的
Dependency来执行检查并调用正确的身份验证方法(JWT 或 KEY)。如果您需要仅使用 JWT 对某些路径进行身份验证,则可以直接使用该依赖项(相同的方法仅适用于 KEY 身份验证) -
谢谢 Isabi,澄清一下,您的意思是添加执行检查并调用正确身份验证依赖项(JWT 依赖项或密钥)的第三个依赖项(这将是
sample_endpoint的唯一依赖项)依赖)。这是正确的吗? -
是的,某种工厂方法
-
您能否提供一个简单的示例,我可以在工厂依赖项中调用其他依赖项?我对如何做到这一点进行了一些研究,但无法找到一个很好的例子。我已经用 api 密钥验证的示例依赖项更新了帖子。
-
我尝试创建工厂依赖项(类),在工厂依赖项中有 _call_ 方法,并在类中创建了两个单独的函数(
check_api_key,check_jwt)。例如,它看起来像def call_api_key(self, b = Depends(get_api_key)):。但它似乎根本没有在check_api_key或check_jwt键中调用相应的依赖项。您能在这里提供一些指导吗?
标签: python oauth jwt openapi fastapi