Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,14 @@ JWT_ISSUER=xxx
# time in minutes
OTP_LIFETIME=10

GATEWAY_PUBLIC_KEY='XXXXXXXXXXXXXXXXXXX'
GATEWAY_KEY_TTL=2

GATEWAY_PUBLIC_KEY='XXXXXXXXXXXXXXXXXXX'
GATEWAY_KEY_TTL=2

# amqp://username:password@host[:port]/
RABBITMQ_URL=amqp://guest:guest@localhost:5672/
RABBITMQ_URL=amqp://guest:guest@localhost:5672/

QUEUE_SECRECT_KEY='XXXXXXXXXXXXXXXXX'
QUEUE_SECRECT_KEY_TTL=0.5
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ dependencies = [
"pyjwt>=2.10.1",
"faststream[rabbit]>=0.5.39",
"starlette>=0.46.2",
"uvicorn>=0.34.1",
"django-extensions>=4.1",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of the django extensions package?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also remove the pika, It was an oversight from me, it's not a dependency again

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of the django extensions package?

It is for listing all exposed django endpoints. run as manage.py show_urls
Comes in handy especially when there's no swagger docs

"uvicorn>=0.34.2",
]

[dependency-groups]
Expand Down
12 changes: 12 additions & 0 deletions src/api/constants/queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypedDict


class QueueNames(TypedDict):
USER_REGISTRATION: str
EMAIL_VALIDATION: str


QUEUE_NAMES: QueueNames = {
"USER_REGISTRATION": "create_user",
"EMAIL_VALIDATION": "validate_email",
}
12 changes: 12 additions & 0 deletions src/api/constants/signature_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypedDict


class SignatureSources(TypedDict):
gateway: str
queue: str


SIGNATURE_SOURCES: SignatureSources = {
"gateway": "Gateway",
"queue": "Broker Queue",
}
76 changes: 76 additions & 0 deletions src/api/middlewares/BrokerMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from types import CoroutineType
from typing import Any
from collections.abc import Callable, Awaitable

from faststream import BaseMiddleware
from faststream.broker.message import StreamMessage
from faststream.rabbit.message import RabbitMessage

from src.env import queue
from src.utils.logger import Logger
from src.api.services.UtilityService import UtilityService
from src.api.constants.signature_sources import SIGNATURE_SOURCES


class PublishMiddleware(BaseMiddleware):
"""
Middleware to handle subscription messages.
"""

async def publish_scope(
self,
call_next: Callable[..., Awaitable[Any]],
msg: RabbitMessage,
*args: tuple[Any, ...],
**kwargs: dict[str, Any],
) -> CoroutineType:
timestamp = UtilityService.get_timestamp()
signature = UtilityService.generate_signature(queue["key"], timestamp)

headers = {}

headers["X-BROKER-SIGNATURE"] = signature
headers["X-BROKER-TIMESTAMP"] = timestamp
headers["X-BROKER-KEY"] = queue["key"]

kwargs["headers"] = headers
return await super().publish_scope(call_next, msg, *args, **kwargs)


class SubscribeMiddleware(BaseMiddleware):
async def consume_scope(
self, call_next: Callable[[Any], Awaitable[Any]], msg: StreamMessage
) -> CoroutineType | None:
logger = Logger(__name__)
try:
UtilityService.verify_signature(
logger=logger,
signature_data={
"signature": msg.headers["X-BROKER-SIGNATURE"],
"timestamp": msg.headers["X-BROKER-TIMESTAMP"],
"key": queue["key"],
"ttl": queue["ttl"],
"title": SIGNATURE_SOURCES["gateway"],
},
)

return await super().consume_scope(call_next, msg)
except KeyError as e:
message = f"Missing required header: {e}"
logger.error(
{
"activity_type": "Authenticate GatewaBroker Queue Request",
"message": message,
"metadata": {"headers": msg.headers},
}
)
except Exception as e:
queue_operation = msg.raw_message.routing_key
message = f"`{queue_operation}` operation failed: {e}"
logger.error(
{
"activity_type": "Authenticate GatewaBroker Queue Request",
"message": message,
"metadata": {"headers": msg.headers, "message": msg._decoded_body},
}
)
84 changes: 84 additions & 0 deletions src/api/middlewares/GateWayMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from django.http import HttpRequest
from ninja.errors import AuthenticationError
from ninja.security import APIKeyHeader
from ninja.openapi.schema import OpenAPISchema

from src.env import api_gateway
from src.utils.logger import Logger
from src.api.services.UtilityService import SignatureData, UtilityService
from src.api.constants.signature_sources import SIGNATURE_SOURCES


class GateWayAuth(APIKeyHeader):
def __init__(self, logger: Logger) -> None:
self.logger = logger
super().__init__()

def authenticate(self, request: HttpRequest, key: str | None) -> str | None:
try:
api_key = request.headers["X-API-GATEWAY-KEY"]
api_timestamp = request.headers["X-API-GATEWAY-TIMESTAMP"]
api_signature = request.headers["X-API-GATEWAY-SIGNATURE"]
except KeyError as e:
message = f"Missing required header: {e}"
self.logger.error(
{
"activity_type": "Authenticate Gateway Request",
"message": message,
"metadata": {"headers": request.headers},
}
)
raise AuthenticationError(message=message)

valid_api_key = api_gateway["key"]
if api_key != valid_api_key:
message = "Invalid API key!"
self.logger.error(
{
"activity_type": "Authenticate Gateway Request",
"message": message,
"metadata": {"headers": request.headers},
}
)
raise AuthenticationError(message=message)

signature_data: SignatureData = {
"signature": api_signature,
"timestamp": api_timestamp,
"key": valid_api_key,
"ttl": 5,
"title": SIGNATURE_SOURCES["gateway"],
}

UtilityService.verify_signature(
logger=self.logger, signature_data=signature_data
)

self.logger.debug(
{
"activity_type": "Authenticate Gateway Request",
"message": "Successfully authenticated gateway request",
"metadata": {
"headers": request.headers,
},
}
)

return api_signature


def get_authentication() -> GateWayAuth:
gateway_auth = GateWayAuth(Logger("Authentication"))
return gateway_auth


def add_global_headers(schema: OpenAPISchema) -> OpenAPISchema:
for path in schema["paths"]:
for method in schema["paths"][path]:
operation = schema["paths"][path][method]
if operation.get("security"):
operation["security"] = schema["security"]
return schema


authentication = get_authentication()
42 changes: 42 additions & 0 deletions src/api/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,56 @@
from ninja import NinjaAPI
from django.http import HttpRequest
from ninja.openapi.schema import OpenAPISchema

from src.env import app
from src.api.middlewares.GateWayMiddleware import authentication, add_global_headers

api: NinjaAPI = NinjaAPI(
version=app["version"],
title=app["display_name"],
description=app["description"],
auth=authentication,
)


original_get_openapi_schema = api.get_openapi_schema


def custom_openapi_schema(path_params: dict | None = None) -> OpenAPISchema:
schema = original_get_openapi_schema()

schema["components"]["securitySchemes"] = {
"Gateway Key": {
"type": "apiKey",
"in": "header",
"name": "X-API-GATEWAY-KEY",
},
"API Timestamp": {
"type": "apiKey",
"in": "header",
"name": "X-API-GATEWAY-TIMESTAMP",
},
"API Signature": {
"type": "apiKey",
"in": "header",
"name": "X-API-GATEWAY-SIGNATURE",
},
}

schema["security"] = [
{
"Gateway Key": [],
"API Timestamp": [],
"API Signature": [],
}
]

schema = add_global_headers(schema)
return schema


setattr(api, "get_openapi_schema", custom_openapi_schema)

from src.api.utils import error_handlers # noqa: E402, F401


Expand Down
12 changes: 12 additions & 0 deletions src/api/services/AuthService.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Annotated

from src.utils.svcs import Service
from src.config.asgi import broker
from src.utils.logger import Logger
from src.api.typing.JWT import JWTSuccess
from src.api.constants.queues import QUEUE_NAMES
from src.api.typing.UserExists import UserExists
from src.api.constants.messages import MESSAGES, DYNAMIC_MESSAGES
from src.api.typing.UserSuccess import UserSuccess
Expand Down Expand Up @@ -60,6 +62,11 @@ async def register(self, req: CreateUserRequest) -> UserExists:

created_user = await UserRepository.add(req)

user_data = {"id": created_user.id, "email": created_user.email}
queue = QUEUE_NAMES["USER_REGISTRATION"]

await broker.publish(message=user_data, queue=queue, persist=True)

await self.otp_service.send_otp(created_user.id)

user = self.utility_service.sanitize_user_object(created_user)
Expand Down Expand Up @@ -120,6 +127,11 @@ async def validate_email(self, req: AuthenticateUserOtp) -> bool:
await UserRepository.update_by_user(
user, {"is_active": True, "is_enabled": True, "is_validated": True}
)

user_data = {"id": user.id, "email": user.email}
queue = QUEUE_NAMES["EMAIL_VALIDATION"]
await broker.publish(message=user_data, queue=queue, persist=True)

return True

async def login(self, req: AuthenticateUserRequest) -> UserSuccess:
Expand Down
64 changes: 63 additions & 1 deletion src/api/services/UtilityService.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import hmac
import hashlib
from uuid import uuid4
from datetime import timedelta
from typing import TypedDict
from datetime import datetime, timedelta

import jwt
import bcrypt
from faker import Faker
from django.utils import timezone
from ninja.errors import AuthenticationError

from src.env import jwt_config
from src.utils.svcs import Service
from src.utils.logger import Logger
from src.api.typing.JWT import JWTData
from src.api.models.postgres import User
from src.api.typing.ExpireUUID import ExpireUUID
Expand All @@ -17,6 +22,14 @@
fake = Faker()


class SignatureData(TypedDict):
title: str
signature: str
timestamp: str
key: str
ttl: int | float


@Service()
class UtilityService:
@staticmethod
Expand Down Expand Up @@ -95,3 +108,52 @@ def generate_uuid() -> ExpireUUID:
lifespan = timedelta(hours=24)
expires_at = current_time + lifespan
return {"uuid": uuid4(), "expires_at": expires_at}

@staticmethod
def generate_signature(key: str, timestamp: str) -> str:
signature = hmac.new(
key=key.encode(), msg=timestamp.encode(), digestmod=hashlib.sha256
).hexdigest()
return signature

@staticmethod
def verify_signature(signature_data: SignatureData, logger: Logger) -> bool:
signature = signature_data["signature"]
timestamp = signature_data["timestamp"]
key = signature_data["key"]
ttl = signature_data["ttl"]
title = signature_data["title"]

valid_signature = UtilityService.generate_signature(key, timestamp)
is_valid = hmac.compare_digest(valid_signature, signature)

if not is_valid:
message = "Invalid signature!"
logger.error(
{
"activity_type": f"Authenticate {title} Request",
"message": message,
"metadata": {"signature": signature},
}
)
raise AuthenticationError(message=message)

initial_time = datetime.fromtimestamp(float(timestamp) / 1000)
valid_window = initial_time + timedelta(minutes=ttl)
if valid_window < datetime.now():
message = "Signature expired!"
logger.error(
{
"activity_type": f"Authenticate {title} Request",
"message": message,
"metadata": {"timestamp": timestamp},
}
)
raise AuthenticationError(message=message)

return True

@staticmethod
def get_timestamp() -> str:
current_time = datetime.now().timestamp() * 1000
return str(current_time)
Loading
Loading