diff --git a/.env.example b/.env.example index 8590fbc..3968564 100644 --- a/.env.example +++ b/.env.example @@ -29,8 +29,11 @@ JWT_ISSUER=xxx # time in minutes OTP_LIFETIME=10 -API_GATEWAY_PUBLIC_KEY=xxxxxxxxxxxxxxxxxxxxxxxx -API_KEY_EXPIRES_AT=2 +GATEWAY_PUBLIC_KEY='XXXXXXXXXXXXXXXXXXX' +GATEWAY_KEY_TTL=2 # amqp://username:password@host[:port]/ -RABBITMQ_URL=amqp://guest:guest@localhost:5672/ \ No newline at end of file +RABBITMQ_URL=amqp://guest:guest@localhost:5672/ + +QUEUE_SECRECT_KEY='XXXXXXXXXXXXXXXXX' +QUEUE_SECRECT_KEY_TTL= 0.5 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ba4f079..ee59cc8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "pyjwt>=2.10.1", "faststream[rabbit]>=0.5.39", "starlette>=0.46.2", + "django-extensions>=4.1", "uvicorn>=0.34.2", ] diff --git a/src/api/constants/queues.py b/src/api/constants/queues.py new file mode 100644 index 0000000..b7bb09b --- /dev/null +++ b/src/api/constants/queues.py @@ -0,0 +1,12 @@ +from typing import TypedDict + + +class QueueNames(TypedDict): + USER_REGISTRATION: str + EMAIL_VALIDATION: str + + +QUEUE_NAMES: QueueNames = { + "USER_REGISTRATION": "create_user", + "EMAIL_VALIDATION": "validate_email", +} diff --git a/src/api/constants/signature_sources.py b/src/api/constants/signature_sources.py new file mode 100644 index 0000000..38bb96b --- /dev/null +++ b/src/api/constants/signature_sources.py @@ -0,0 +1,12 @@ +from typing import TypedDict + + +class SignatureSources(TypedDict): + gateway: str + queue: str + + +SIGNATURE_SOURCES: SignatureSources = { + "gateway": "Gateway", + "queue": "Broker Queue", +} diff --git a/src/api/middlewares/BrokerMiddleware.py b/src/api/middlewares/BrokerMiddleware.py new file mode 100644 index 0000000..94756a7 --- /dev/null +++ b/src/api/middlewares/BrokerMiddleware.py @@ -0,0 +1,76 @@ +from types import CoroutineType +from typing import Any +from collections.abc import Callable, Awaitable + +from faststream import BaseMiddleware +from faststream.broker.message import StreamMessage +from faststream.rabbit.message import RabbitMessage + +from src.env import queue +from src.utils.logger import Logger +from src.api.services.UtilityService import UtilityService +from src.api.constants.signature_sources import SIGNATURE_SOURCES + + +class PublishMiddleware(BaseMiddleware): + """ + Middleware to handle subscription messages. + """ + + async def publish_scope( + self, + call_next: Callable[..., Awaitable[Any]], + msg: RabbitMessage, + *args: tuple[Any, ...], + **kwargs: dict[str, Any], + ) -> CoroutineType: + timestamp = UtilityService.get_timestamp() + signature = UtilityService.generate_signature(queue["key"], timestamp) + + headers = {} + + headers["X-BROKER-SIGNATURE"] = signature + headers["X-BROKER-TIMESTAMP"] = timestamp + headers["X-BROKER-KEY"] = queue["key"] + + kwargs["headers"] = headers + return await super().publish_scope(call_next, msg, *args, **kwargs) + + +class SubscribeMiddleware(BaseMiddleware): + async def consume_scope( + self, call_next: Callable[[Any], Awaitable[Any]], msg: StreamMessage + ) -> CoroutineType | None: + logger = Logger(__name__) + try: + UtilityService.verify_signature( + logger=logger, + signature_data={ + "signature": msg.headers["X-BROKER-SIGNATURE"], + "timestamp": msg.headers["X-BROKER-TIMESTAMP"], + "key": queue["key"], + "ttl": queue["ttl"], + "title": SIGNATURE_SOURCES["gateway"], + }, + ) + + return await super().consume_scope(call_next, msg) + except KeyError as e: + message = f"Missing required header: {e}" + logger.error( + { + "activity_type": "Authenticate GatewaBroker Queue Request", + "message": message, + "metadata": {"headers": msg.headers}, + } + ) + except Exception as e: + queue_operation = msg.raw_message.routing_key + message = f"`{queue_operation}` operation failed: {e}" + logger.error( + { + "activity_type": "Authenticate GatewaBroker Queue Request", + "message": message, + "metadata": {"headers": msg.headers, "message": msg._decoded_body}, + } + ) diff --git a/src/api/middlewares/GateWayMiddleware.py b/src/api/middlewares/GateWayMiddleware.py index 9ce6687..932d4af 100644 --- a/src/api/middlewares/GateWayMiddleware.py +++ b/src/api/middlewares/GateWayMiddleware.py @@ -1,7 +1,3 @@ -import hmac -import hashlib -from datetime import datetime, timedelta - from django.http import HttpRequest from ninja.errors import AuthenticationError from ninja.security import APIKeyHeader @@ -9,6 +5,8 @@ from src.env import api_gateway from src.utils.logger import Logger +from src.api.services.UtilityService import SignatureData, UtilityService +from src.api.constants.signature_sources import SIGNATURE_SOURCES class GateWayAuth(APIKeyHeader): @@ -27,7 +25,7 @@ def authenticate(self, request: HttpRequest, key: str | None) -> str | None: message = f"Missing required header: {e}" self.logger.error( { - "activity_type": "Authenticate User", + "activity_type": "Authenticate Gateway Request", "message": message, "metadata": {"headers": request.headers}, } @@ -39,22 +37,33 @@ def authenticate(self, request: HttpRequest, key: str | None) -> str | None: message = "Invalid API key!" self.logger.error( { - "activity_type": "Authenticate User", + "activity_type": "Authenticate Gateway Request", "message": message, "metadata": {"headers": request.headers}, } ) raise AuthenticationError(message=message) - self._verify_signature(valid_api_key, api_signature, api_timestamp) + signature_data: SignatureData = { + "signature": api_signature, + "timestamp": api_timestamp, + "key": valid_api_key, + "ttl": api_gateway["ttl"], + "title": SIGNATURE_SOURCES["gateway"], + } + + UtilityService.verify_signature( + logger=self.logger, signature_data=signature_data + ) self.logger.debug( { - "activity_type": "Authenticate User", - "message": "Successfully authenticated user", + "activity_type": "Authenticate Gateway Request", + "message": "Successfully authenticated gateway request", "metadata": { "user_id": user_id, "user_email": user_email, + "headers": request.headers, }, } ) @@ -63,44 +72,6 @@ def authenticate(self, request: HttpRequest, key: str | None) -> str | None: return api_signature - def _verify_signature( - self, valid_api_key: str, signature: str, timestamp: str - ) -> bool: - valid_signature = self.generate_signature(valid_api_key, timestamp) - is_valid = hmac.compare_digest(valid_signature, signature) - - if not is_valid: - message = "Invalid signature!" - self.logger.error( - { - "activity_type": "Authenticate User", - "message": message, - "metadata": {"signature": signature}, - } - ) - raise AuthenticationError(message=message) - - initial_time = datetime.fromtimestamp(int(timestamp) / 1000) - valid_window = initial_time + timedelta(minutes=api_gateway["expires_at"]) - if valid_window < datetime.now(): - message = "Signature expired!" - self.logger.error( - { - "activity_type": "Authenticate User", - "message": message, - "metadata": {"timestamp": timestamp}, - } - ) - raise AuthenticationError(message=message) - - return True - - def generate_signature(self, api_key: str, timestamp: str) -> str: - signature = hmac.new( - key=api_key.encode(), msg=timestamp.encode(), digestmod=hashlib.sha256 - ).hexdigest() - return signature - def get_authentication() -> GateWayAuth: gateway_auth = GateWayAuth(Logger("Authentication")) diff --git a/src/api/models/payload/requests/CreateUserRequest.py b/src/api/models/payload/requests/CreateUserRequest.py index 1a4e086..d8b7be2 100644 --- a/src/api/models/payload/requests/CreateUserRequest.py +++ b/src/api/models/payload/requests/CreateUserRequest.py @@ -1,8 +1,12 @@ from pydantic import EmailStr, BaseModel -from src.api.typing.PasswordValidator import IsStrongPassword - class CreateUserRequest(BaseModel): + id: str email: EmailStr - password: IsStrongPassword + first_name: str + last_name: str + address: str + phone_number: str + profile_picture: str + pin: str diff --git a/src/api/services/UserService.py b/src/api/services/UserService.py index c3d32e5..01f01e9 100644 --- a/src/api/services/UserService.py +++ b/src/api/services/UserService.py @@ -1,16 +1,22 @@ from typing import Annotated +from faststream.rabbit import RabbitRouter + from src.utils.svcs import Service from src.utils.logger import Logger from src.api.models.postgres import User +from src.api.constants.queues import QUEUE_NAMES from src.api.constants.messages import MESSAGES, DYNAMIC_MESSAGES from src.api.typing.UserSuccess import UserSuccess from src.api.constants.activity_types import ACTIVITY_TYPES from src.api.repositories.UserRepository import UserRepository +from src.api.models.payload.requests.CreateUserRequest import CreateUserRequest from src.api.models.payload.requests.UpdateUserRequest import UpdateUserRequest from .UtilityService import UtilityService +UserRouter = RabbitRouter() + @Service() class UserService: @@ -112,3 +118,55 @@ async def update(self, id: str, req: UpdateUserRequest) -> UserSuccess: user = self.utility_service.sanitize_user_object(updated_user) return {"is_success": True, "user": user} + + @staticmethod + @UserRouter.subscriber(queue=QUEUE_NAMES["USER_REGISTRATION"]) + async def register(message: dict) -> None: + logger = Logger("UserService") + user_data = CreateUserRequest( + id=message["id"], + email=message["email"], + first_name="", + last_name="", + address="", + phone_number="", + profile_picture="", + pin="", + ) + new_user = await UserRepository.add(user_data) + + logger.info( + { + "activity_type": ACTIVITY_TYPES["USER_REGISTRATION"], + "message": MESSAGES["REGISTRATION"]["USER_REGISTERED"], + "metadata": {"user": {"id": new_user.id, "email": new_user.email}}, + } + ) + + @staticmethod + @UserRouter.subscriber(queue=QUEUE_NAMES["EMAIL_VALIDATION"]) + async def validate_user(message: dict) -> None: + logger = Logger("UserService") + user = await UserRepository.find_by_id(message["id"]) + + if not user: + logger.warn( + { + "activity_type": ACTIVITY_TYPES["EMAIL_VALIDATION"], + "message": DYNAMIC_MESSAGES["COMMON"]["FETCHED_FAILED"]("User"), + "metadata": {"user": {"id": message["id"]}}, + } + ) + return + + await UserRepository.update_by_user( + user, {"is_active": True, "is_enabled": True, "is_validated": True} + ) + + logger.info( + { + "activity_type": ACTIVITY_TYPES["EMAIL_VALIDATION"], + "message": MESSAGES["REGISTRATION"]["VERIFICATION_SUCCESS"], + "metadata": {"user": {"id": user.id, "email": user.email}}, + } + ) diff --git a/src/api/services/UtilityService.py b/src/api/services/UtilityService.py index e03388c..695387c 100644 --- a/src/api/services/UtilityService.py +++ b/src/api/services/UtilityService.py @@ -1,13 +1,18 @@ +import hmac +import hashlib from uuid import uuid4 -from datetime import timedelta +from typing import TypedDict +from datetime import datetime, timedelta import jwt import bcrypt from faker import Faker from django.utils import timezone +from ninja.errors import AuthenticationError from src.env import jwt_config from src.utils.svcs import Service +from src.utils.logger import Logger from src.api.models.postgres import User from src.api.typing.ExpireUUID import ExpireUUID from src.api.enums.CharacterCasing import CharacterCasing @@ -16,6 +21,14 @@ fake = Faker() +class SignatureData(TypedDict): + title: str + signature: str + timestamp: str + key: str + ttl: int | float + + @Service() class UtilityService: @staticmethod @@ -80,3 +93,53 @@ def generate_uuid() -> ExpireUUID: lifespan = timedelta(hours=24) expires_at = current_time + lifespan return {"uuid": uuid4(), "expires_at": expires_at} + + @staticmethod + def generate_signature(key: str, timestamp: str) -> str: + signature = hmac.new( + key=key.encode(), msg=timestamp.encode(), digestmod=hashlib.sha256 + ).hexdigest() + return signature + + @staticmethod + def verify_signature(signature_data: SignatureData, logger: Logger) -> bool: + signature = signature_data["signature"] + timestamp = signature_data["timestamp"] + key = signature_data["key"] + ttl = signature_data["ttl"] + title = signature_data["title"] + + valid_signature = UtilityService.generate_signature(key, timestamp) + is_valid = hmac.compare_digest(valid_signature, signature) + + if not is_valid: + message = "Invalid signature!" + logger.error( + { + "activity_type": f"Authenticate {title} Request", + "message": message, + "metadata": {"signature": signature}, + } + ) + raise AuthenticationError(message=message) + + initial_time = datetime.fromtimestamp(float(timestamp) / 1000) + valid_window = initial_time + timedelta(minutes=ttl) + + if valid_window < datetime.now(): + message = "Signature expired!" + logger.error( + { + "activity_type": f"Authenticate {title} Request", + "message": message, + "metadata": {"timestamp": timestamp}, + } + ) + raise AuthenticationError(message=message) + + return True + + @staticmethod + def get_timestamp() -> str: + current_time = datetime.now().timestamp() * 1000 + return str(current_time) diff --git a/src/api/services/external/RabbitMQRoutes.py b/src/api/services/external/RabbitMQRoutes.py index 8b13789..c713a91 100644 --- a/src/api/services/external/RabbitMQRoutes.py +++ b/src/api/services/external/RabbitMQRoutes.py @@ -1 +1,5 @@ +from src.config.asgi import broker +from ..UserService import UserRouter + +broker.include_router(UserRouter) diff --git a/src/config/apps.py b/src/config/apps.py index 963d990..be8962e 100644 --- a/src/config/apps.py +++ b/src/config/apps.py @@ -13,6 +13,7 @@ THIRD_PARTY_APPS = [ "corsheaders", + "django_extensions", ] INSTALLED_APPS = [ diff --git a/src/config/asgi.py b/src/config/asgi.py index 00c0ac7..1ff8214 100644 --- a/src/config/asgi.py +++ b/src/config/asgi.py @@ -21,9 +21,20 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.config.settings") + +def setup_broker_middlewares() -> None: + from src.api.middlewares.BrokerMiddleware import ( + PublishMiddleware, + SubscribeMiddleware, + ) + + broker.add_middleware(SubscribeMiddleware) + broker.add_middleware(PublishMiddleware) + + application = Starlette( routes=[Mount("/", get_asgi_application())], # type: ignore - on_startup=[broker.start], + on_startup=[setup_broker_middlewares, broker.start], on_shutdown=[broker.close], ) diff --git a/src/config/settings.py b/src/config/settings.py index fb8cb6e..013ed58 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -18,3 +18,12 @@ STATIC_URL = "static/" MEDIA_URL = "media/" + + +LANGUAGE_CODE = "en-us" + +TIME_ZONE = "UTC" + +USE_I18N = True + +USE_TZ = True diff --git a/src/env.py b/src/env.py index fcecc4e..a41c43c 100644 --- a/src/env.py +++ b/src/env.py @@ -1,7 +1,7 @@ from typing import TypedDict from src import __name__, __version__, __description__, __display_name__ -from src.utils.env import get_env_int, get_env_str, get_env_list +from src.utils.env import get_env_int, get_env_str, get_env_list, get_env_float class Env: @@ -54,6 +54,16 @@ class RabbitMQ(TypedDict): url: str +class Gateway(TypedDict): + key: str + ttl: int + + +class Queue(TypedDict): + key: str + ttl: float + + env = Env() app: App = { @@ -102,9 +112,14 @@ class RabbitMQ(TypedDict): "issuer": get_env_str("JWT_ISSUER"), } -api_gateway = { - "key": get_env_str("API_GATEWAY_PUBLIC_KEY"), - "expires_at": get_env_int("API_KEY_EXPIRES_AT"), +api_gateway: Gateway = { + "key": get_env_str("GATEWAY_PUBLIC_KEY"), + "ttl": get_env_int("GATEWAY_KEY_TTL"), +} + +queue: Queue = { + "key": get_env_str("QUEUE_SECRECT_KEY"), + "ttl": get_env_float("QUEUE_SECRECT_KEY_TTL"), } otp: OTP = {"lifetime": get_env_int("OTP_LIFETIME")} @@ -121,5 +136,6 @@ class RabbitMQ(TypedDict): "jwt_config", "log", "otp", + "queue", "rabbitmq_config", ] diff --git a/src/utils/env/__init__.py b/src/utils/env/__init__.py index c231c7c..9f765cb 100644 --- a/src/utils/env/__init__.py +++ b/src/utils/env/__init__.py @@ -1,6 +1,7 @@ -from .env import get_env_int, get_env_str, get_env_list +from .env import get_env_int, get_env_str, get_env_list, get_env_float __all__ = [ + "get_env_float", "get_env_int", "get_env_list", "get_env_str", diff --git a/src/utils/env/env.py b/src/utils/env/env.py index a4a9fee..d337aee 100644 --- a/src/utils/env/env.py +++ b/src/utils/env/env.py @@ -26,6 +26,10 @@ def get_env_int(name: str, default: str | None = None) -> int: return int(get_env_variable(name, default=default, cast=int)) +def get_env_float(name: str, default: str | None = None) -> float: + return float(get_env_variable(name, default=default, cast=float)) + + def get_env_list(name: str, sep: str = ",", default: str | None = None) -> list[str]: return list( get_env_variable( diff --git a/uv.lock b/uv.lock index ef4b6f7..f9977e7 100644 --- a/uv.lock +++ b/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.12" [[package]] @@ -151,12 +152,12 @@ wheels = [ [[package]] name = "df-wallet-user-service" -version = "1.0.0" source = { virtual = "." } dependencies = [ { name = "bcrypt" }, { name = "django" }, { name = "django-cors-headers" }, + { name = "django-extensions" }, { name = "django-mongodb-backend" }, { name = "django-ninja" }, { name = "faker" }, @@ -187,6 +188,7 @@ requires-dist = [ { name = "bcrypt", specifier = ">=4.3.0" }, { name = "django", specifier = ">=5.1.6" }, { name = "django-cors-headers", specifier = ">=4.7.0" }, + { name = "django-extensions", specifier = ">=4.1" }, { name = "django-mongodb-backend", specifier = ">=5.1.0b0" }, { name = "django-ninja", specifier = ">=1.3.0" }, { name = "faker", specifier = ">=36.1.1" }, @@ -248,6 +250,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7e/a2/7bcfff86314bd9dd698180e31ba00604001606efb518a06cca6833a54285/django_cors_headers-4.7.0-py3-none-any.whl", hash = "sha256:f1c125dcd58479fe7a67fe2499c16ee38b81b397463cf025f0e2c42937421070", size = 12794 }, ] +[[package]] +name = "django-extensions" +version = "4.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "django" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6d/b3/ed0f54ed706ec0b54fd251cc0364a249c6cd6c6ec97f04dc34be5e929eac/django_extensions-4.1.tar.gz", hash = "sha256:7b70a4d28e9b840f44694e3f7feb54f55d495f8b3fa6c5c0e5e12bcb2aa3cdeb", size = 283078 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/64/96/d967ca440d6a8e3861120f51985d8e5aec79b9a8bdda16041206adfe7adc/django_extensions-4.1-py3-none-any.whl", hash = "sha256:0699a7af28f2523bf8db309a80278519362cd4b6e1fd0a8cd4bf063e1e023336", size = 232980 }, +] + [[package]] name = "django-mongodb-backend" version = "5.1.0b1"