-
Notifications
You must be signed in to change notification settings - Fork 92
fix(auth): add startup-generated API key authentication to all routes #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| """ | ||
| API key authentication for SecuScan backend. | ||
|
|
||
| A random key is generated at startup and written to <data_dir>/.api_key. | ||
| Clients must supply it via: | ||
| - Authorization: Bearer <key> | ||
| - X-Api-Key: <key> | ||
| """ | ||
|
|
||
| import secrets | ||
| from pathlib import Path | ||
|
|
||
| from fastapi import Depends, HTTPException, Security, status | ||
| from fastapi.security import APIKeyHeader, HTTPAuthorizationCredentials, HTTPBearer | ||
|
|
||
| _bearer_scheme = HTTPBearer(auto_error=False) | ||
| _api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) | ||
|
|
||
| _api_key: str | None = None | ||
|
|
||
|
|
||
| def init_api_key(data_dir: str) -> str: | ||
| """ | ||
| Load the persisted API key, or generate and persist a new one. | ||
|
|
||
| Called once during application startup; the returned key is also stored in | ||
| the module-level ``_api_key`` variable so the FastAPI dependency can reach it. | ||
| """ | ||
| global _api_key | ||
| key_file = Path(data_dir) / ".api_key" | ||
| if key_file.exists(): | ||
| _api_key = key_file.read_text().strip() | ||
| else: | ||
| _api_key = secrets.token_hex(32) | ||
| key_file.parent.mkdir(parents=True, exist_ok=True) | ||
| key_file.write_text(_api_key) | ||
| key_file.chmod(0o600) | ||
| return _api_key | ||
|
|
||
|
|
||
| async def require_api_key( | ||
| bearer: HTTPAuthorizationCredentials | None = Depends(_bearer_scheme), | ||
| x_api_key: str | None = Security(_api_key_header), | ||
| ) -> str: | ||
| """ | ||
| FastAPI dependency — rejects requests that do not carry the correct API key. | ||
|
|
||
| Accepts the key in either: | ||
| - ``Authorization: Bearer <key>`` | ||
| - ``X-Api-Key: <key>`` | ||
| """ | ||
| if _api_key is None: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | ||
| detail="Authentication service not initialised", | ||
| ) | ||
|
|
||
| candidate: str | None = None | ||
| if bearer is not None: | ||
| candidate = bearer.credentials | ||
| elif x_api_key is not None: | ||
| candidate = x_api_key | ||
|
|
||
| if candidate is None or not secrets.compare_digest(candidate, _api_key): | ||
| raise HTTPException( | ||
| status_code=status.HTTP_401_UNAUTHORIZED, | ||
| detail="Invalid or missing API key", | ||
| headers={"WWW-Authenticate": "Bearer"}, | ||
| ) | ||
|
|
||
| return candidate |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| API routes for SecuScan backend | ||
| """ | ||
|
|
||
| from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Response | ||
| from fastapi import APIRouter, HTTPException, BackgroundTasks, Response, Request | ||
| from fastapi.responses import JSONResponse | ||
| from typing import Any, Optional, List, Dict, Callable | ||
|
|
@@ -32,12 +33,12 @@ def parse_json_fields(rows: List[Dict], fields: List[str]) -> List[Dict]: | |
|
|
||
| def is_filesystem_target(target: str) -> bool: | ||
| """Best-effort detection for path-based targets that should bypass host validation.""" | ||
| if target.startswith(("/", "./", "../", "~")): | ||
| # Absolute or relative filesystem roots only — not CIDR notation (e.g. 8.8.8.8/32) | ||
| if target.startswith(("/", "./", "../", "~/")): | ||
| return True | ||
| # Windows drive paths (C:\ or C:/) | ||
| if re.match(r"^[A-Za-z]:[\\/]", target): | ||
| return True | ||
| if "/" in target and not target.startswith(("http://", "https://")): | ||
| return True | ||
| return False | ||
|
|
||
|
|
||
|
|
@@ -76,10 +77,11 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: | |
| from .reporting import reporting | ||
| from .vault import VaultCrypto | ||
| from .workflows import scheduler | ||
| from .auth import require_api_key | ||
|
|
||
| from sse_starlette.sse import EventSourceResponse | ||
|
|
||
| router = APIRouter(prefix="/api/v1") | ||
| router = APIRouter(prefix="/api/v1", dependencies=[Depends(require_api_key)]) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Applying Useful? React with 👍 / 👎. |
||
|
|
||
|
|
||
| async def get_or_set_cached(key: str, builder): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| """ | ||
| Unit tests for API key authentication (issue #199). | ||
| """ | ||
|
|
||
| import asyncio | ||
| import tempfile | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
| from fastapi.testclient import TestClient | ||
|
|
||
| from backend.secuscan import auth as auth_module | ||
| from backend.secuscan.main import app | ||
| from backend.secuscan.config import settings | ||
| from backend.secuscan.database import init_db | ||
| from backend.secuscan.plugins import init_plugins | ||
|
|
||
|
|
||
| @pytest.fixture() | ||
| def client_with_key(setup_test_environment): | ||
| """TestClient with a valid API key pre-seeded.""" | ||
| asyncio.run(init_db(settings.database_path)) | ||
| asyncio.run(init_plugins(settings.plugins_dir)) | ||
| api_key = auth_module.init_api_key(settings.data_dir) | ||
| with TestClient(app) as c: | ||
| yield c, api_key | ||
|
|
||
|
|
||
| class TestApiKeyInit: | ||
| def test_key_file_created(self, tmp_path): | ||
| key = auth_module.init_api_key(str(tmp_path)) | ||
| assert (tmp_path / ".api_key").exists() | ||
| assert len(key) == 64 # 32 bytes → 64 hex chars | ||
|
|
||
| def test_existing_key_reloaded(self, tmp_path): | ||
| k1 = auth_module.init_api_key(str(tmp_path)) | ||
| k2 = auth_module.init_api_key(str(tmp_path)) | ||
| assert k1 == k2 | ||
|
|
||
| def test_key_file_permissions(self, tmp_path): | ||
| auth_module.init_api_key(str(tmp_path)) | ||
| mode = (tmp_path / ".api_key").stat().st_mode & 0o777 | ||
| assert mode == 0o600 | ||
|
|
||
|
|
||
| class TestAuthDependency: | ||
| def test_no_credentials_returns_401(self, client_with_key): | ||
| client, _ = client_with_key | ||
| resp = client.get("/api/v1/plugins", headers={}) | ||
| assert resp.status_code == 401 | ||
|
|
||
| def test_wrong_key_returns_401(self, client_with_key): | ||
| client, _ = client_with_key | ||
| resp = client.get("/api/v1/plugins", headers={"X-Api-Key": "wrong-key"}) | ||
| assert resp.status_code == 401 | ||
|
|
||
| def test_valid_x_api_key_header(self, client_with_key): | ||
| client, api_key = client_with_key | ||
| resp = client.get("/api/v1/plugins", headers={"X-Api-Key": api_key}) | ||
| assert resp.status_code == 200 | ||
|
|
||
| def test_valid_bearer_token(self, client_with_key): | ||
| client, api_key = client_with_key | ||
| resp = client.get("/api/v1/plugins", headers={"Authorization": f"Bearer {api_key}"}) | ||
| assert resp.status_code == 200 | ||
|
|
||
| def test_bearer_wrong_key_returns_401(self, client_with_key): | ||
| client, _ = client_with_key | ||
| resp = client.get("/api/v1/plugins", headers={"Authorization": "Bearer bad"}) | ||
| assert resp.status_code == 401 | ||
|
|
||
| def test_health_endpoint_not_protected(self, client_with_key): | ||
| client, _ = client_with_key | ||
| resp = client.get("/api/v1/health", headers={}) | ||
| # health check is defined on `app` directly, not inside the authenticated router | ||
| assert resp.status_code == 200 | ||
|
|
||
| def test_root_endpoint_not_protected(self, client_with_key): | ||
| client, _ = client_with_key | ||
| resp = client.get("/", headers={}) | ||
| assert resp.status_code == 200 | ||
|
|
||
|
|
||
| class TestIsFilesystemTarget: | ||
| """Regression tests for is_filesystem_target — CIDR must not be treated as a path.""" | ||
|
|
||
| from backend.secuscan.routes import is_filesystem_target | ||
|
|
||
| @pytest.mark.parametrize("target,expected", [ | ||
| ("/etc/passwd", True), | ||
| ("./relative/path", True), | ||
| ("../parent/path", True), | ||
| ("~/home/dir", True), | ||
| ("C:\\Windows\\System32", True), | ||
| ("C:/Windows/System32", True), | ||
| # These are NOT filesystem targets | ||
| ("8.8.8.8/32", False), | ||
| ("192.168.1.0/24", False), | ||
| ("example.com", False), | ||
| ("http://example.com/path", False), | ||
| ("https://example.com/path", False), | ||
| ("10.0.0.1", False), | ||
| ]) | ||
| def test_filesystem_target_detection(self, target, expected): | ||
| from backend.secuscan.routes import is_filesystem_target | ||
| assert is_filesystem_target(target) == expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new
is_filesystem_target()logic no longer recognizes relative paths likeartifacts/memdump.raw(it only accepts/,./,../,~/, or Windows-drive prefixes), so these values now fall intovalidate_target()and are rejected as invalid hostnames. This regresses non-code plugins that accept file/directory targets because previously slash-containing paths bypassed host validation; now users must rewrite existing inputs to add./.Useful? React with 👍 / 👎.