From 6d9761d39f85594b7b217085db595f0e5a0f526e Mon Sep 17 00:00:00 2001 From: ablogo Date: Thu, 15 Jan 2026 12:06:28 -0600 Subject: [PATCH] adding and validate jwt roles --- src/dependencies.py | 2 +- src/middlewares/auth_roles_jwt.py | 19 ++++++++++++++ src/models/user_model.py | 1 + src/routers/admin/security_router.py | 3 +-- src/services/jwt_service.py | 39 ++++++++++++++++++++++------ src/services/login_service.py | 6 ++--- 6 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 src/middlewares/auth_roles_jwt.py diff --git a/src/dependencies.py b/src/dependencies.py index eb7598d..bca9557 100644 --- a/src/dependencies.py +++ b/src/dependencies.py @@ -6,7 +6,7 @@ load_dotenv() client = AsyncMongoClient(os.environ["DB_URL"], server_api= ServerApi(version='1', strict=True, deprecation_errors=True)) database = client.get_database(os.environ["DB_NAME"]) - + async def get_db(): try: yield database diff --git a/src/middlewares/auth_roles_jwt.py b/src/middlewares/auth_roles_jwt.py new file mode 100644 index 0000000..f478dec --- /dev/null +++ b/src/middlewares/auth_roles_jwt.py @@ -0,0 +1,19 @@ +from typing import Dict +from fastapi import HTTPException, Request +from fastapi.security import OAuth2PasswordBearer + +from src.services.jwt_service import verify_token_and_roles + +class JWTCustom(OAuth2PasswordBearer): + + def __init__(self, tokenUrl: str, scheme_name: str | None = None, scopes: Dict[str, str] | None = None, description: str | None = None, auto_error: bool = True): + super().__init__(tokenUrl, scheme_name, scopes, description, auto_error) + + async def __call__(self, request: Request): + try: + token = await super().__call__(request) + if token is not None: + email = await verify_token_and_roles(token, ['admin']) + return email + except Exception as e: + raise e \ No newline at end of file diff --git a/src/models/user_model.py b/src/models/user_model.py index 714b441..2513958 100644 --- a/src/models/user_model.py +++ b/src/models/user_model.py @@ -25,6 +25,7 @@ class User(BaseModel): email_verified: bool = False password: str twofactor_enabled: bool = False + roles: Optional[List[str]] = list() address: Optional[List[Address]] = list() online: bool = False disabled: bool = False diff --git a/src/routers/admin/security_router.py b/src/routers/admin/security_router.py index 440db84..23ac4a3 100644 --- a/src/routers/admin/security_router.py +++ b/src/routers/admin/security_router.py @@ -3,8 +3,7 @@ from dependency_injector.wiring import Provide, inject from src.dependency_injection.containers import Container -from src.logging.mongo_logging import MongoLogger -from src.middlewares.auth_jwt import JWTCustom +from src.middlewares.auth_roles_jwt import JWTCustom import src.services.totp_service as securitySvc oauth2_scheme = JWTCustom(tokenUrl="/auth/sign-in") diff --git a/src/services/jwt_service.py b/src/services/jwt_service.py index 6765d17..9121da3 100644 --- a/src/services/jwt_service.py +++ b/src/services/jwt_service.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta, timezone +from typing import Optional from dependency_injector.wiring import Provide, inject from fastapi import HTTPException, Request from dotenv import load_dotenv @@ -16,7 +17,12 @@ async def create_token(data: dict, expire_time: timedelta = timedelta(minutes=int(str(os.environ["JWT_EXPIRE_MINUTES"]))), crypto = crypto_service, log = log_service): try: for item in data: - data[item] = await crypto.encrypt_text(data[item]) + if isinstance(data[item], list): + for x in range(len(data[item])): + # To encrypt the roles use the commented code + data[item][x] = data[item][x] #await crypto.encrypt_text(data[item][x]) + else: + data[item] = await crypto.encrypt_text(data[item]) expire = datetime.now(timezone.utc) + expire_time data.update({ "exp": expire }) @@ -33,8 +39,25 @@ async def verify_token(token: str, crypto = crypto_service, log = log_service): email = await crypto.decrypt_text(payload.get("sub")) return email except Exception as e: - log.logger.error(e) - raise e + log.logger.error(e) + raise e + +@inject +async def verify_token_and_roles(token: str, required_roles: Optional[list[str]] = None, crypto = crypto_service, log = log_service): + try: + payload = await verify(token) + email = await crypto.decrypt_text(payload.get("sub")) + if required_roles: + roles = payload.get('roles', []) + + if bool(set(required_roles) & set(roles)): + return email + else: + raise HTTPException(status_code=401, detail="Insufficient permissions") + + except Exception as e: + log.logger.error(e) + raise e #async def verify_token(Authorization: str = Header(...)) -> bool: async def verify_token_from_requests(request: Request): @@ -47,8 +70,8 @@ async def verify_token_from_requests(request: Request): return email except Exception as e: - #log.logger.error(e) - raise e + #log.logger.error(e) + raise e @inject async def verify(request_token: str, log = log_service): @@ -62,8 +85,8 @@ async def verify(request_token: str, log = log_service): log.logger.error(e) raise HTTPException(status_code=401, detail="Invalid credentials") except Exception as e: - log.logger.error(e) - raise e + log.logger.error(e) + raise e @inject async def get_email(token, crypto = crypto_service, log = log_service): @@ -73,6 +96,6 @@ async def get_email(token, crypto = crypto_service, log = log_service): if email is None: return None except Exception as e: - log.logger.error(e) + log.logger.error(e) else: return email \ No newline at end of file diff --git a/src/services/login_service.py b/src/services/login_service.py index 9f56205..b5d140a 100644 --- a/src/services/login_service.py +++ b/src/services/login_service.py @@ -11,7 +11,7 @@ logger: MongoLogger = Provide[Container.logging] @inject -async def login(username, password, db, crypto = crypto_service, log = logger): +async def login(username: str, password: str, db, crypto = crypto_service, log = logger): try: user = await get_user(username, db) if user is not None: @@ -19,8 +19,8 @@ async def login(username, password, db, crypto = crypto_service, log = logger): if not user.email_verified: return True, None if is_password_valid and not user.disabled: - token = await create_token({ "sub": user.email, "name": user.name }) - return True, Token(access_token=token, token_type="bearer") + token = await create_token({ "sub": user.email, "name": user.name, "roles": user.roles }) + return True, Token(access_token = token, token_type = "bearer") except Exception as e: log.logger.error(e) return False, None