Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SECRET_KEY=your_secret_key
DEBUG=True
ENVIRONMENT=xxxx
ALLOWED_HOSTS=*

LOG_LEVEL=debug
Expand All @@ -26,4 +27,7 @@ JWT_SECRET=xxx
JWT_ISSUER=xxx

# time in minutes
OTP_LIFETIME=10
OTP_LIFETIME=10

API_GATEWAY_PUBLIC_KEY=xxxxxxxxxxxxxxxxxxxxxxxx
API_KEY_EXPIRES_AT=2
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"pydantic[email]>=2.10.6",
"django-ninja>=1.3.0",
"pyjwt>=2.10.1",
"typing>=3.10.0.0",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is typing not a builtin package?

]

[dependency-groups]
Expand Down
119 changes: 119 additions & 0 deletions src/api/middlewares/GateWayMiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import hmac
import hashlib
from datetime import datetime, timedelta

from django.http import HttpRequest
from ninja.errors import AuthenticationError
from ninja.security import APIKeyHeader
from ninja.openapi.schema import OpenAPISchema

from src.env import api_gateway
from src.utils.logger import Logger


class GateWayAuth(APIKeyHeader):
def __init__(self, logger: Logger) -> None:
self.logger = logger
super().__init__()

def authenticate(self, request: HttpRequest, key: str | None) -> str | None:
try:
api_key = request.headers["X-API-GATEWAY-KEY"]
api_timestamp = request.headers["X-API-GATEWAY-TIMESTAMP"]
api_signature = request.headers["X-API-GATEWAY-SIGNATURE"]
user_id = request.headers["X-USER-ID"]
user_email = request.headers["X-USER-EMAIL"]
except KeyError as e:
message = f"Missing required header: {e}"
self.logger.error(
{
"activity_type": "Authenticate User",
"message": message,
"metadata": {"headers": request.headers},
}
)
raise AuthenticationError(message=message)

valid_api_key = api_gateway["key"]
if api_key != valid_api_key:
message = "Invalid API key!"
self.logger.error(
{
"activity_type": "Authenticate User",
"message": message,
"metadata": {"headers": request.headers},
}
)
raise AuthenticationError(message=message)

self._verify_signature(valid_api_key, api_signature, api_timestamp)

self.logger.debug(
{
"activity_type": "Authenticate User",
"message": "Successfully authenticated user",
"metadata": {
"user_id": user_id,
"user_email": user_email,
},
}
)
setattr(request, "auth_email", user_email)
setattr(request, "auth_id", user_id)

return api_signature

def _verify_signature(
self, valid_api_key: str, signature: str, timestamp: str
) -> bool:
valid_signature = self.generate_signature(valid_api_key, timestamp)
is_valid = hmac.compare_digest(valid_signature, signature)

if not is_valid:
message = "Invalid signature!"
self.logger.error(
{
"activity_type": "Authenticate User",
"message": message,
"metadata": {"signature": signature},
}
)
raise AuthenticationError(message=message)

initial_time = datetime.fromtimestamp(int(timestamp) / 1000)
valid_window = initial_time + timedelta(minutes=api_gateway["expires_at"])
if valid_window < datetime.now():
message = "Signature expired!"
self.logger.error(
{
"activity_type": "Authenticate User",
"message": message,
"metadata": {"timestamp": timestamp},
}
)
raise AuthenticationError(message=message)

return True

def generate_signature(self, api_key: str, timestamp: str) -> str:
signature = hmac.new(
key=api_key.encode(), msg=timestamp.encode(), digestmod=hashlib.sha256
).hexdigest()
return signature


def get_authentication() -> GateWayAuth:
gateway_auth = GateWayAuth(Logger("Authentication"))
return gateway_auth


def add_global_headers(schema: OpenAPISchema) -> OpenAPISchema:
for path in schema["paths"]:
for method in schema["paths"][path]:
operation = schema["paths"][path][method]
if operation.get("security"):
operation["security"] = schema["security"]
return schema


authentication = get_authentication()
66 changes: 56 additions & 10 deletions src/api/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,84 @@
from ninja import NinjaAPI
from django.http import HttpRequest
from ninja.openapi.schema import OpenAPISchema

from src.env import app
from src.api.middlewares.AppMiddleware import authentication
from src.api.middlewares.GateWayMiddleware import authentication, add_global_headers

api: NinjaAPI = NinjaAPI(
version=app["version"],
title=app["display_name"],
description=app["description"],
auth=authentication,
)

original_get_openapi_schema = api.get_openapi_schema


def custom_openapi_schema(path_params: dict | None = None) -> OpenAPISchema:
schema = original_get_openapi_schema()

schema["components"]["securitySchemes"] = {
"Gateway Key": {
"type": "apiKey",
"in": "header",
"name": "X-API-GATEWAY-KEY",
},
"API Timestamp": {
"type": "apiKey",
"in": "header",
"name": "X-API-GATEWAY-TIMESTAMP",
},
"API Signature": {
"type": "apiKey",
"in": "header",
"name": "X-API-GATEWAY-SIGNATURE",
},
"User ID": {
"type": "apiKey",
"in": "header",
"name": "X-USER-ID",
},
"User Email": {
"type": "apiKey",
"in": "header",
"name": "X-USER-EMAIL",
},
}

schema["security"] = [
{
"Gateway Key": [],
"API Timestamp": [],
"API Signature": [],
"User ID": [],
"User Email": [],
}
]

schema = add_global_headers(schema)
return schema


setattr(api, "get_openapi_schema", custom_openapi_schema)

from src.api.utils import error_handlers # noqa: E402, F401


@api.get("/")
@api.get("/", auth=None)
async def home(request: HttpRequest) -> dict:
return {"message": "Hello, World!"}


api.add_router(
"/users", "src.api.routes.User.router", auth=authentication, tags=["User"]
)
api.add_router(
"/kyc", "src.api.routes.UserKYC.router", auth=authentication, tags=["User KYC"]
)
api.add_router("/users", "src.api.routes.User.router", tags=["User"])
api.add_router("/kyc", "src.api.routes.UserKYC.router", tags=["User KYC"])
api.add_router(
"/next-of-kin",
"src.api.routes.UserNOK.router",
auth=authentication,
tags=["User NOK"],
)
api.add_router(
"/withdrawal-accounts",
"src.api.routes.WithdrawalAccount.router",
auth=authentication,
tags=["User"],
)
7 changes: 6 additions & 1 deletion src/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class OTP(TypedDict):
"issuer": get_env_str("JWT_ISSUER"),
}

api_gateway = {
"key": get_env_str("API_GATEWAY_PUBLIC_KEY"),
"expires_at": get_env_int("API_KEY_EXPIRES_AT"),
}

otp: OTP = {"lifetime": get_env_int("OTP_LIFETIME")}

__all__ = ["app", "cache", "db", "env", "jwt_config", "log", "otp"]
__all__ = ["api_gateway", "app", "cache", "db", "env", "jwt_config", "log", "otp"]
13 changes: 12 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.