Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,18 @@ SECUSCAN_ALLOW_LOOPBACK_SCANS=true
# Optional Plugin Security
# SECUSCAN_PLUGIN_SIGNATURE_KEY=replace-with-your-signing-key
# SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=false

# Vault Encryption Key (REQUIRED when using the credential vault)
# Generate with: python3 -c "import secrets,base64; print(base64.urlsafe_b64encode(secrets.token_bytes(32)).decode())"
# SECUSCAN_VAULT_KEY=replace-with-a-stable-local-secret

# API Authentication
# On first startup SecuScan generates a random API key and writes it to
# backend/data/.api_key. Add it to all requests as:
# Authorization: Bearer <key> or X-Api-Key: <key>
# The key is printed to the console on first run and reloaded automatically
# on subsequent starts — no manual configuration needed.

# Frontend Overrides
# Leave these unset for the default local dev flow.
# VITE_API_PROXY_TARGET=http://127.0.0.1:8000
Expand Down
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ python-multipart>=0.0.9
xhtml2pdf>=0.2.17
aiosqlite>=0.20.0
python-whois>=0.9.4
cryptography>=42.0.0
88 changes: 88 additions & 0 deletions backend/secuscan/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Startup-generated API key authentication for SecuScan.

On first run a cryptographically random key is written to backend/data/.api_key
and printed to the console. Every subsequent request must supply it as:
Authorization: Bearer <key>
or
X-Api-Key: <key>
"""

from __future__ import annotations

import logging
import secrets
from pathlib import Path
from typing import Optional

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

logger = logging.getLogger(__name__)

_bearer_scheme = HTTPBearer(auto_error=False)

_api_key: Optional[str] = None
_key_file: Optional[Path] = None


def init_api_key(data_dir: str) -> str:
"""Load or generate the API key and persist it to data_dir/.api_key."""
global _api_key, _key_file
_key_file = Path(data_dir) / ".api_key"
_key_file.parent.mkdir(parents=True, exist_ok=True)

if _key_file.exists():
_api_key = _key_file.read_text().strip()
if _api_key:
logger.info("Loaded existing API key from %s", _key_file)
return _api_key

_api_key = secrets.token_hex(32)
_key_file.write_text(_api_key)
_key_file.chmod(0o600)

logger.warning(
"\n"
"╔══════════════════════════════════════════════════════╗\n"
"║ SecuScan API Key (first-run) ║\n"
"║ ║\n"
"║ %s ║\n"
"║ ║\n"
"║ Add this to your requests: ║\n"
"║ Authorization: Bearer <key> ║\n"
"║ Key saved at: backend/data/.api_key ║\n"
"╚══════════════════════════════════════════════════════╝",
_api_key,
)
return _api_key


def _get_api_key() -> str:
if _api_key is None:
raise RuntimeError("API key not initialised — call init_api_key() during startup")
return _api_key


async def require_api_key(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer_scheme),
) -> None:
"""FastAPI dependency that enforces API key authentication."""
supplied: Optional[str] = None

# Accept Bearer token
if credentials and credentials.scheme.lower() == "bearer":
supplied = credentials.credentials

# Also accept X-Api-Key header (convenience for curl / scripts)
if not supplied:
supplied = request.headers.get("X-Api-Key")

expected = _get_api_key()

if not supplied or not secrets.compare_digest(supplied, expected):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key. Supply it as 'Authorization: Bearer <key>' or 'X-Api-Key: <key>'.",
headers={"WWW-Authenticate": "Bearer"},
)
16 changes: 14 additions & 2 deletions backend/secuscan/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,20 @@ def base_url(self) -> str:

@property
def resolved_vault_key(self) -> bytes:
"""Return a deterministic 32-byte key for credential vault encryption."""
seed = self.vault_key or self.plugin_signature_key or "secuscan-dev-key"
"""Return a 32-byte key (base64url-encoded) for AES-256-GCM vault encryption.

Requires SECUSCAN_VAULT_KEY to be set explicitly — no insecure default is
provided. If the variable is absent the application will raise at startup
rather than silently encrypt secrets with a publicly known key.
"""
seed = self.vault_key or self.plugin_signature_key
if not seed:
raise RuntimeError(
"SECUSCAN_VAULT_KEY is not set. "
"Generate a secure key with: python3 -c \"import secrets, base64; "
"print(base64.urlsafe_b64encode(secrets.token_bytes(32)).decode())\" "
"and add it to your .env file as SECUSCAN_VAULT_KEY=<value>."
)
digest = hashlib.sha256(seed.encode("utf-8")).digest()
return base64.urlsafe_b64encode(digest)

Expand Down
38 changes: 22 additions & 16 deletions backend/secuscan/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,22 +755,28 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]:
parser_path = plugin_dir / "parser.py"

if parser_path.exists():
try:
import importlib.util
spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path)
if spec is not None:
loader = spec.loader
if loader is not None:
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
if hasattr(module, "parse"):
logger.info(f"Using custom parser for {plugin.id}")
parsed = module.parse(parser_input)
return self._normalize_parsed_result(plugin, parser_input, parsed)
else:
logger.warning(f"Custom parser {parser_path} missing 'parse' function")
except Exception as e:
logger.error(f"Error executing custom parser for {plugin.id}: {e}")
if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir):
logger.error(
"Skipping custom parser for plugin %s due to integrity verification failure",
plugin.id,
)
else:
try:
import importlib.util
spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path)
if spec is not None:
loader = spec.loader
if loader is not None:
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
if hasattr(module, "parse"):
logger.info(f"Using custom parser for {plugin.id}")
parsed = module.parse(parser_input)
return self._normalize_parsed_result(plugin, parser_input, parsed)
else:
logger.warning(f"Custom parser {parser_path} missing 'parse' function")
except Exception as e:
logger.error(f"Error executing custom parser for {plugin.id}: {e}")

# 2. Fallback to legacy built-in parsers
if parser_type == "builtin_nmap":
Expand Down
7 changes: 6 additions & 1 deletion backend/secuscan/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles

from .auth import init_api_key
from .config import settings
from .cache import init_cache, cache as global_cache
from .database import init_db, db as global_db
Expand All @@ -38,10 +39,14 @@ async def lifespan(app: FastAPI):
"""Application lifespan manager"""
# Startup
logger.info("🚀 Starting SecuScan backend...")

# Ensure directories exist
settings.ensure_directories()
logger.info("✓ Directories initialized")

# Initialise API key (generates on first run, loads on subsequent runs)
init_api_key(settings.data_dir)
logger.info("✓ API key ready")

# Initialize database
await init_db(settings.database_path)
Expand Down
120 changes: 119 additions & 1 deletion backend/secuscan/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import hashlib
import hmac

from .models import PluginMetadata
from .models import PluginMetadata, PluginFieldType
from .config import settings

logger = logging.getLogger(__name__)


_PORT_SPEC_PATTERN = re.compile(r"^[\d,\-]+$")


class PluginManager:
"""Manages plugin loading and validation"""

Expand Down Expand Up @@ -173,6 +176,118 @@ def compute_plugin_digest(metadata_file: Path, parser_file: Path) -> str:

return hashlib.sha256(f"{metadata_digest}:{parser_digest}".encode("utf-8")).hexdigest()

def verify_parser_at_exec_time(self, plugin: PluginMetadata, plugin_dir: Path) -> bool:
"""Re-verify parser file integrity immediately before execution.

Closes the TOCTOU window between plugin load (where _verify_plugin_integrity
runs) and the moment the parser module is actually executed. Any modification
to parser.py or metadata.json after startup is caught here.
"""
parser_file = plugin_dir / "parser.py"
metadata_file = plugin_dir / "metadata.json"

if not plugin.checksum:
if settings.enforce_plugin_signatures:
logger.error(
"Refusing to execute parser for plugin %s: "
"no checksum present and enforce_plugin_signatures is enabled",
plugin.id,
)
return False
logger.warning(
"Executing unverified parser for plugin %s (no checksum configured). "
"Set SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=true to require integrity checks.",
plugin.id,
)
return True

try:
current_digest = self.compute_plugin_digest(metadata_file, parser_file)
except Exception as exc:
logger.error(
"Failed to compute digest for plugin %s at execution time: %s",
plugin.id,
exc,
)
return False

if not hmac.compare_digest(current_digest, plugin.checksum):
logger.error(
"SECURITY: Parser integrity check failed for plugin %s "
"(digest mismatch — possible tampering). Refusing execution.",
plugin.id,
)
return False

return True

def _validate_inputs_against_schema(self, plugin: PluginMetadata, inputs: Dict[str, Any]) -> None:
"""Validate user inputs against the plugin field schema.

Enforces type constraints and allowed-value sets declared in plugin metadata,
and rejects string values that would inject unexpected arguments into the
constructed CLI command.

Raises ValueError for any invalid or potentially injected input.
"""
for field in plugin.fields:
value = inputs.get(field.id)
if value is None or value == "":
continue

str_value = str(value)

if field.type == PluginFieldType.SELECT and field.options:
allowed = {opt.get("value", "") for opt in field.options}
if str_value not in allowed:
raise ValueError(
f"Field '{field.id}': value {str_value!r} is not in the allowed set "
f"{sorted(v for v in allowed if v)}"
)

if field.type == PluginFieldType.INTEGER:
try:
int(value)
except (TypeError, ValueError):
raise ValueError(
f"Field '{field.id}' expects an integer, got {value!r}"
)

if field.validation and isinstance(field.validation.get("pattern"), str):
pattern = field.validation["pattern"]
if not re.search(pattern, str_value):
msg = field.validation.get(
"message",
f"Field '{field.id}' value {str_value!r} does not match the required pattern",
)
raise ValueError(msg)

if field.type in (PluginFieldType.STRING, PluginFieldType.TEXT):
self._reject_injected_args(field.id, str_value)

def _reject_injected_args(self, field_id: str, value: str) -> None:
"""Reject string values that could inject unintended CLI flags.

Port fields must only contain digits, commas, and hyphens so that a value
like '--script=evil.nse' cannot be passed as a separate argv element that
tools like nmap would interpret as a flag.

All other string fields must not begin with a dash, which would be treated
as a flag prefix by virtually every CLI tool.
"""
if field_id in ("ports", "port"):
if value and not _PORT_SPEC_PATTERN.match(value):
raise ValueError(
f"Invalid port specification {value!r}: only digits, commas, "
"and hyphens are permitted (e.g. '80,443' or '1-1000')"
)
return

if value.lstrip().startswith("-"):
raise ValueError(
f"Field '{field_id}' value must not begin with '-': {value!r}"
)

def get_plugin(self, plugin_id: str) -> Optional[PluginMetadata]:
"""Get plugin by ID"""
return self.plugins.get(plugin_id)
Expand Down Expand Up @@ -332,6 +447,9 @@ def build_command(self, plugin_id: str, inputs: Dict) -> Optional[List[str]]:
if not plugin:
return None

# Validate raw inputs before normalization transforms values (e.g. wordlist
# alias → resolved path) so that select-field checks see the original value.
self._validate_inputs_against_schema(plugin, inputs)
inputs = self._normalize_inputs(plugin, inputs)
command = []

Expand Down
Loading
Loading