Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ JWT_ISSUER=xxx
# time in minutes
OTP_LIFETIME=10

API_GATEWAY_PUBLIC_KEY=xxxxxxxxxxxxxxxxxxxxxxxx
API_KEY_EXPIRES_AT=2
GATEWAY_PUBLIC_KEY='XXXXXXXXXXXXXXXXXXX'
GATEWAY_KEY_TTL=2

# amqp://username:password@host[:port]/
RABBITMQ_URL=amqp://guest:guest@localhost:5672/
RABBITMQ_URL=amqp://guest:guest@localhost:5672/

QUEUE_SECRECT_KEY='XXXXXXXXXXXXXXXXX'
QUEUE_SECRECT_KEY_TTL= 0.5
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"pyjwt>=2.10.1",
"faststream[rabbit]>=0.5.39",
"starlette>=0.46.2",
"django-extensions>=4.1",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the django extensions for?
And requests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was testing something with requests. Will remove it
django extension is for listing the endpoints URL, more of a dev dependency

"uvicorn>=0.34.2",
]

Expand Down
12 changes: 12 additions & 0 deletions src/api/constants/queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypedDict


class QueueNames(TypedDict):
USER_REGISTRATION: str
EMAIL_VALIDATION: str


QUEUE_NAMES: QueueNames = {
"USER_REGISTRATION": "create_user",
"EMAIL_VALIDATION": "validate_email",
}
12 changes: 12 additions & 0 deletions src/api/constants/signature_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypedDict


class SignatureSources(TypedDict):
gateway: str
queue: str


SIGNATURE_SOURCES: SignatureSources = {
"gateway": "Gateway",
"queue": "Broker Queue",
}
76 changes: 76 additions & 0 deletions src/api/middlewares/BrokerMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from types import CoroutineType
from typing import Any
from collections.abc import Callable, Awaitable

from faststream import BaseMiddleware
from faststream.broker.message import StreamMessage
from faststream.rabbit.message import RabbitMessage

from src.env import queue
from src.utils.logger import Logger
from src.api.services.UtilityService import UtilityService
from src.api.constants.signature_sources import SIGNATURE_SOURCES


class PublishMiddleware(BaseMiddleware):
"""
Middleware to handle subscription messages.
"""

async def publish_scope(
self,
call_next: Callable[..., Awaitable[Any]],
msg: RabbitMessage,
*args: tuple[Any, ...],
**kwargs: dict[str, Any],
) -> CoroutineType:
timestamp = UtilityService.get_timestamp()
signature = UtilityService.generate_signature(queue["key"], timestamp)

headers = {}

headers["X-BROKER-SIGNATURE"] = signature
headers["X-BROKER-TIMESTAMP"] = timestamp
headers["X-BROKER-KEY"] = queue["key"]

kwargs["headers"] = headers
return await super().publish_scope(call_next, msg, *args, **kwargs)


class SubscribeMiddleware(BaseMiddleware):
async def consume_scope(
self, call_next: Callable[[Any], Awaitable[Any]], msg: StreamMessage
) -> CoroutineType | None:
logger = Logger(__name__)
try:
UtilityService.verify_signature(
logger=logger,
signature_data={
"signature": msg.headers["X-BROKER-SIGNATURE"],
"timestamp": msg.headers["X-BROKER-TIMESTAMP"],
"key": queue["key"],
"ttl": queue["ttl"],
"title": SIGNATURE_SOURCES["gateway"],
},
)

return await super().consume_scope(call_next, msg)
except KeyError as e:
message = f"Missing required header: {e}"
logger.error(
{
"activity_type": "Authenticate GatewaBroker Queue Request",
"message": message,
"metadata": {"headers": msg.headers},
}
)
except Exception as e:
queue_operation = msg.raw_message.routing_key
message = f"`{queue_operation}` operation failed: {e}"
logger.error(
{
"activity_type": "Authenticate GatewaBroker Queue Request",
"message": message,
"metadata": {"headers": msg.headers, "message": msg._decoded_body},
}
)
65 changes: 18 additions & 47 deletions src/api/middlewares/GateWayMiddleware.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import hmac
import hashlib
from datetime import datetime, timedelta

from django.http import HttpRequest
from ninja.errors import AuthenticationError
from ninja.security import APIKeyHeader
from ninja.openapi.schema import OpenAPISchema

from src.env import api_gateway
from src.utils.logger import Logger
from src.api.services.UtilityService import SignatureData, UtilityService
from src.api.constants.signature_sources import SIGNATURE_SOURCES


class GateWayAuth(APIKeyHeader):
Expand All @@ -27,7 +25,7 @@ def authenticate(self, request: HttpRequest, key: str | None) -> str | None:
message = f"Missing required header: {e}"
self.logger.error(
{
"activity_type": "Authenticate User",
"activity_type": "Authenticate Gateway Request",
"message": message,
"metadata": {"headers": request.headers},
}
Expand All @@ -39,22 +37,33 @@ def authenticate(self, request: HttpRequest, key: str | None) -> str | None:
message = "Invalid API key!"
self.logger.error(
{
"activity_type": "Authenticate User",
"activity_type": "Authenticate Gateway Request",
"message": message,
"metadata": {"headers": request.headers},
}
)
raise AuthenticationError(message=message)

self._verify_signature(valid_api_key, api_signature, api_timestamp)
signature_data: SignatureData = {
"signature": api_signature,
"timestamp": api_timestamp,
"key": valid_api_key,
"ttl": api_gateway["ttl"],
"title": SIGNATURE_SOURCES["gateway"],
}

UtilityService.verify_signature(
logger=self.logger, signature_data=signature_data
)

self.logger.debug(
{
"activity_type": "Authenticate User",
"message": "Successfully authenticated user",
"activity_type": "Authenticate Gateway Request",
"message": "Successfully authenticated gateway request",
"metadata": {
"user_id": user_id,
"user_email": user_email,
"headers": request.headers,
},
}
)
Expand All @@ -63,44 +72,6 @@ def authenticate(self, request: HttpRequest, key: str | None) -> str | None:

return api_signature

def _verify_signature(
self, valid_api_key: str, signature: str, timestamp: str
) -> bool:
valid_signature = self.generate_signature(valid_api_key, timestamp)
is_valid = hmac.compare_digest(valid_signature, signature)

if not is_valid:
message = "Invalid signature!"
self.logger.error(
{
"activity_type": "Authenticate User",
"message": message,
"metadata": {"signature": signature},
}
)
raise AuthenticationError(message=message)

initial_time = datetime.fromtimestamp(int(timestamp) / 1000)
valid_window = initial_time + timedelta(minutes=api_gateway["expires_at"])
if valid_window < datetime.now():
message = "Signature expired!"
self.logger.error(
{
"activity_type": "Authenticate User",
"message": message,
"metadata": {"timestamp": timestamp},
}
)
raise AuthenticationError(message=message)

return True

def generate_signature(self, api_key: str, timestamp: str) -> str:
signature = hmac.new(
key=api_key.encode(), msg=timestamp.encode(), digestmod=hashlib.sha256
).hexdigest()
return signature


def get_authentication() -> GateWayAuth:
gateway_auth = GateWayAuth(Logger("Authentication"))
Expand Down
10 changes: 7 additions & 3 deletions src/api/models/payload/requests/CreateUserRequest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from pydantic import EmailStr, BaseModel

from src.api.typing.PasswordValidator import IsStrongPassword


class CreateUserRequest(BaseModel):
id: str
email: EmailStr
password: IsStrongPassword
first_name: str
last_name: str
address: str
phone_number: str
profile_picture: str
pin: str
58 changes: 58 additions & 0 deletions src/api/services/UserService.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from typing import Annotated

from faststream.rabbit import RabbitRouter

from src.utils.svcs import Service
from src.utils.logger import Logger
from src.api.models.postgres import User
from src.api.constants.queues import QUEUE_NAMES
from src.api.constants.messages import MESSAGES, DYNAMIC_MESSAGES
from src.api.typing.UserSuccess import UserSuccess
from src.api.constants.activity_types import ACTIVITY_TYPES
from src.api.repositories.UserRepository import UserRepository
from src.api.models.payload.requests.CreateUserRequest import CreateUserRequest
from src.api.models.payload.requests.UpdateUserRequest import UpdateUserRequest

from .UtilityService import UtilityService

UserRouter = RabbitRouter()


@Service()
class UserService:
Expand Down Expand Up @@ -112,3 +118,55 @@ async def update(self, id: str, req: UpdateUserRequest) -> UserSuccess:
user = self.utility_service.sanitize_user_object(updated_user)

return {"is_success": True, "user": user}

@staticmethod
@UserRouter.subscriber(queue=QUEUE_NAMES["USER_REGISTRATION"])
async def register(message: dict) -> None:
logger = Logger("UserService")
user_data = CreateUserRequest(
id=message["id"],
email=message["email"],
first_name="",
last_name="",
address="",
phone_number="",
profile_picture="",
pin="",
)
new_user = await UserRepository.add(user_data)

logger.info(
{
"activity_type": ACTIVITY_TYPES["USER_REGISTRATION"],
"message": MESSAGES["REGISTRATION"]["USER_REGISTERED"],
"metadata": {"user": {"id": new_user.id, "email": new_user.email}},
}
)

@staticmethod
@UserRouter.subscriber(queue=QUEUE_NAMES["EMAIL_VALIDATION"])
async def validate_user(message: dict) -> None:
logger = Logger("UserService")
user = await UserRepository.find_by_id(message["id"])

if not user:
logger.warn(
{
"activity_type": ACTIVITY_TYPES["EMAIL_VALIDATION"],
"message": DYNAMIC_MESSAGES["COMMON"]["FETCHED_FAILED"]("User"),
"metadata": {"user": {"id": message["id"]}},
}
)
return

await UserRepository.update_by_user(
user, {"is_active": True, "is_enabled": True, "is_validated": True}
)

logger.info(
{
"activity_type": ACTIVITY_TYPES["EMAIL_VALIDATION"],
"message": MESSAGES["REGISTRATION"]["VERIFICATION_SUCCESS"],
"metadata": {"user": {"id": user.id, "email": user.email}},
}
)
Loading