diff --git a/.env.example b/.env.example index e8d1afa8..e559fcaf 100644 --- a/.env.example +++ b/.env.example @@ -25,8 +25,18 @@ SECUSCAN_ALLOW_LOOPBACK_SCANS=true # Optional Plugin Security # SECUSCAN_PLUGIN_SIGNATURE_KEY=replace-with-your-signing-key # SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=false + +# Vault Encryption Key (REQUIRED when using the credential vault) +# Generate with: python3 -c "import secrets,base64; print(base64.urlsafe_b64encode(secrets.token_bytes(32)).decode())" # SECUSCAN_VAULT_KEY=replace-with-a-stable-local-secret +# API Authentication +# On first startup SecuScan generates a random API key and writes it to +# backend/data/.api_key. Add it to all requests as: +# Authorization: Bearer or X-Api-Key: +# The key is printed to the console on first run and reloaded automatically +# on subsequent starts — no manual configuration needed. + # Frontend Overrides # Leave these unset for the default local dev flow. # VITE_API_PROXY_TARGET=http://127.0.0.1:8000 diff --git a/backend/requirements.txt b/backend/requirements.txt index 2ec0ce1f..1873fa24 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -9,3 +9,4 @@ python-multipart>=0.0.9 xhtml2pdf>=0.2.17 aiosqlite>=0.20.0 python-whois>=0.9.4 +cryptography>=42.0.0 diff --git a/backend/secuscan/auth.py b/backend/secuscan/auth.py new file mode 100644 index 00000000..3ec8344e --- /dev/null +++ b/backend/secuscan/auth.py @@ -0,0 +1,88 @@ +"""Startup-generated API key authentication for SecuScan. + +On first run a cryptographically random key is written to backend/data/.api_key +and printed to the console. Every subsequent request must supply it as: + Authorization: Bearer +or + X-Api-Key: +""" + +from __future__ import annotations + +import logging +import secrets +from pathlib import Path +from typing import Optional + +from fastapi import Depends, HTTPException, Request, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +logger = logging.getLogger(__name__) + +_bearer_scheme = HTTPBearer(auto_error=False) + +_api_key: Optional[str] = None +_key_file: Optional[Path] = None + + +def init_api_key(data_dir: str) -> str: + """Load or generate the API key and persist it to data_dir/.api_key.""" + global _api_key, _key_file + _key_file = Path(data_dir) / ".api_key" + _key_file.parent.mkdir(parents=True, exist_ok=True) + + if _key_file.exists(): + _api_key = _key_file.read_text().strip() + if _api_key: + logger.info("Loaded existing API key from %s", _key_file) + return _api_key + + _api_key = secrets.token_hex(32) + _key_file.write_text(_api_key) + _key_file.chmod(0o600) + + logger.warning( + "\n" + "╔══════════════════════════════════════════════════════╗\n" + "║ SecuScan API Key (first-run) ║\n" + "║ ║\n" + "║ %s ║\n" + "║ ║\n" + "║ Add this to your requests: ║\n" + "║ Authorization: Bearer ║\n" + "║ Key saved at: backend/data/.api_key ║\n" + "╚══════════════════════════════════════════════════════╝", + _api_key, + ) + return _api_key + + +def _get_api_key() -> str: + if _api_key is None: + raise RuntimeError("API key not initialised — call init_api_key() during startup") + return _api_key + + +async def require_api_key( + request: Request, + credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer_scheme), +) -> None: + """FastAPI dependency that enforces API key authentication.""" + supplied: Optional[str] = None + + # Accept Bearer token + if credentials and credentials.scheme.lower() == "bearer": + supplied = credentials.credentials + + # Also accept X-Api-Key header (convenience for curl / scripts) + if not supplied: + supplied = request.headers.get("X-Api-Key") + + expected = _get_api_key() + + if not supplied or not secrets.compare_digest(supplied, expected): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or missing API key. Supply it as 'Authorization: Bearer ' or 'X-Api-Key: '.", + headers={"WWW-Authenticate": "Bearer"}, + ) diff --git a/backend/secuscan/config.py b/backend/secuscan/config.py index e05e573c..f1575fa6 100644 --- a/backend/secuscan/config.py +++ b/backend/secuscan/config.py @@ -95,8 +95,20 @@ def base_url(self) -> str: @property def resolved_vault_key(self) -> bytes: - """Return a deterministic 32-byte key for credential vault encryption.""" - seed = self.vault_key or self.plugin_signature_key or "secuscan-dev-key" + """Return a 32-byte key (base64url-encoded) for AES-256-GCM vault encryption. + + Requires SECUSCAN_VAULT_KEY to be set explicitly — no insecure default is + provided. If the variable is absent the application will raise at startup + rather than silently encrypt secrets with a publicly known key. + """ + seed = self.vault_key or self.plugin_signature_key + if not seed: + raise RuntimeError( + "SECUSCAN_VAULT_KEY is not set. " + "Generate a secure key with: python3 -c \"import secrets, base64; " + "print(base64.urlsafe_b64encode(secrets.token_bytes(32)).decode())\" " + "and add it to your .env file as SECUSCAN_VAULT_KEY=." + ) digest = hashlib.sha256(seed.encode("utf-8")).digest() return base64.urlsafe_b64encode(digest) diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 3b45fbbe..2b509688 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -755,22 +755,28 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: parser_path = plugin_dir / "parser.py" if parser_path.exists(): - try: - import importlib.util - spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) - if spec is not None: - loader = spec.loader - if loader is not None: - module = importlib.util.module_from_spec(spec) - loader.exec_module(module) - if hasattr(module, "parse"): - logger.info(f"Using custom parser for {plugin.id}") - parsed = module.parse(parser_input) - return self._normalize_parsed_result(plugin, parser_input, parsed) - else: - logger.warning(f"Custom parser {parser_path} missing 'parse' function") - except Exception as e: - logger.error(f"Error executing custom parser for {plugin.id}: {e}") + if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir): + logger.error( + "Skipping custom parser for plugin %s due to integrity verification failure", + plugin.id, + ) + else: + try: + import importlib.util + spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) + if spec is not None: + loader = spec.loader + if loader is not None: + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + if hasattr(module, "parse"): + logger.info(f"Using custom parser for {plugin.id}") + parsed = module.parse(parser_input) + return self._normalize_parsed_result(plugin, parser_input, parsed) + else: + logger.warning(f"Custom parser {parser_path} missing 'parse' function") + except Exception as e: + logger.error(f"Error executing custom parser for {plugin.id}: {e}") # 2. Fallback to legacy built-in parsers if parser_type == "builtin_nmap": diff --git a/backend/secuscan/main.py b/backend/secuscan/main.py index 08eb02c2..3428d913 100644 --- a/backend/secuscan/main.py +++ b/backend/secuscan/main.py @@ -12,6 +12,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles +from .auth import init_api_key from .config import settings from .cache import init_cache, cache as global_cache from .database import init_db, db as global_db @@ -38,10 +39,14 @@ async def lifespan(app: FastAPI): """Application lifespan manager""" # Startup logger.info("🚀 Starting SecuScan backend...") - + # Ensure directories exist settings.ensure_directories() logger.info("✓ Directories initialized") + + # Initialise API key (generates on first run, loads on subsequent runs) + init_api_key(settings.data_dir) + logger.info("✓ API key ready") # Initialize database await init_db(settings.database_path) diff --git a/backend/secuscan/plugins.py b/backend/secuscan/plugins.py index d224bd4a..171a1fb1 100644 --- a/backend/secuscan/plugins.py +++ b/backend/secuscan/plugins.py @@ -12,12 +12,15 @@ import hashlib import hmac -from .models import PluginMetadata +from .models import PluginMetadata, PluginFieldType from .config import settings logger = logging.getLogger(__name__) +_PORT_SPEC_PATTERN = re.compile(r"^[\d,\-]+$") + + class PluginManager: """Manages plugin loading and validation""" @@ -173,6 +176,118 @@ def compute_plugin_digest(metadata_file: Path, parser_file: Path) -> str: return hashlib.sha256(f"{metadata_digest}:{parser_digest}".encode("utf-8")).hexdigest() + def verify_parser_at_exec_time(self, plugin: PluginMetadata, plugin_dir: Path) -> bool: + """Re-verify parser file integrity immediately before execution. + + Closes the TOCTOU window between plugin load (where _verify_plugin_integrity + runs) and the moment the parser module is actually executed. Any modification + to parser.py or metadata.json after startup is caught here. + """ + parser_file = plugin_dir / "parser.py" + metadata_file = plugin_dir / "metadata.json" + + if not plugin.checksum: + if settings.enforce_plugin_signatures: + logger.error( + "Refusing to execute parser for plugin %s: " + "no checksum present and enforce_plugin_signatures is enabled", + plugin.id, + ) + return False + logger.warning( + "Executing unverified parser for plugin %s (no checksum configured). " + "Set SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=true to require integrity checks.", + plugin.id, + ) + return True + + try: + current_digest = self.compute_plugin_digest(metadata_file, parser_file) + except Exception as exc: + logger.error( + "Failed to compute digest for plugin %s at execution time: %s", + plugin.id, + exc, + ) + return False + + if not hmac.compare_digest(current_digest, plugin.checksum): + logger.error( + "SECURITY: Parser integrity check failed for plugin %s " + "(digest mismatch — possible tampering). Refusing execution.", + plugin.id, + ) + return False + + return True + + def _validate_inputs_against_schema(self, plugin: PluginMetadata, inputs: Dict[str, Any]) -> None: + """Validate user inputs against the plugin field schema. + + Enforces type constraints and allowed-value sets declared in plugin metadata, + and rejects string values that would inject unexpected arguments into the + constructed CLI command. + + Raises ValueError for any invalid or potentially injected input. + """ + for field in plugin.fields: + value = inputs.get(field.id) + if value is None or value == "": + continue + + str_value = str(value) + + if field.type == PluginFieldType.SELECT and field.options: + allowed = {opt.get("value", "") for opt in field.options} + if str_value not in allowed: + raise ValueError( + f"Field '{field.id}': value {str_value!r} is not in the allowed set " + f"{sorted(v for v in allowed if v)}" + ) + + if field.type == PluginFieldType.INTEGER: + try: + int(value) + except (TypeError, ValueError): + raise ValueError( + f"Field '{field.id}' expects an integer, got {value!r}" + ) + + if field.validation and isinstance(field.validation.get("pattern"), str): + pattern = field.validation["pattern"] + if not re.search(pattern, str_value): + msg = field.validation.get( + "message", + f"Field '{field.id}' value {str_value!r} does not match the required pattern", + ) + raise ValueError(msg) + + if field.type in (PluginFieldType.STRING, PluginFieldType.TEXT): + self._reject_injected_args(field.id, str_value) + + def _reject_injected_args(self, field_id: str, value: str) -> None: + """Reject string values that could inject unintended CLI flags. + + Port fields must only contain digits, commas, and hyphens so that a value + like '--script=evil.nse' cannot be passed as a separate argv element that + tools like nmap would interpret as a flag. + + All other string fields must not begin with a dash, which would be treated + as a flag prefix by virtually every CLI tool. + """ + if field_id in ("ports", "port"): + if value and not _PORT_SPEC_PATTERN.match(value): + raise ValueError( + f"Invalid port specification {value!r}: only digits, commas, " + "and hyphens are permitted (e.g. '80,443' or '1-1000')" + ) + return + + if value.lstrip().startswith("-"): + raise ValueError( + f"Field '{field_id}' value must not begin with '-': {value!r}" + ) + def get_plugin(self, plugin_id: str) -> Optional[PluginMetadata]: """Get plugin by ID""" return self.plugins.get(plugin_id) @@ -332,6 +447,9 @@ def build_command(self, plugin_id: str, inputs: Dict) -> Optional[List[str]]: if not plugin: return None + # Validate raw inputs before normalization transforms values (e.g. wordlist + # alias → resolved path) so that select-field checks see the original value. + self._validate_inputs_against_schema(plugin, inputs) inputs = self._normalize_inputs(plugin, inputs) command = [] diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index f1d53063..e46cbe90 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -2,6 +2,7 @@ API routes for SecuScan backend """ +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Response from fastapi import APIRouter, HTTPException, BackgroundTasks, Response, Request from fastapi.responses import JSONResponse from typing import Any, Optional, List, Dict, Callable @@ -31,13 +32,19 @@ def parse_json_fields(rows: List[Dict], fields: List[str]) -> List[Dict]: def is_filesystem_target(target: str) -> bool: - """Best-effort detection for path-based targets that should bypass host validation.""" - if target.startswith(("/", "./", "../", "~")): + """Detect explicit local filesystem paths that should bypass host validation. + + Only match genuine filesystem roots. Network notation (CIDR, URLs with paths) + must NOT match here — they must pass through validate_target so safe mode is + enforced correctly. The previous broad '/' check allowed bypassing safe mode + via targets like '8.8.8.8/32'. + """ + # Absolute POSIX path, relative path, or home-dir expansion + if target.startswith(("/", "./", "../", "~/")): return True + # Windows absolute path (C:\, D:/, …) if re.match(r"^[A-Za-z]:[\\/]", target): return True - if "/" in target and not target.startswith(("http://", "https://")): - return True return False @@ -62,6 +69,7 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: logger = logging.getLogger(__name__) +from .auth import require_api_key from .cache import get_cache from .models import ( TaskCreateRequest, TaskResponse, TaskResult, @@ -79,7 +87,7 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: from sse_starlette.sse import EventSourceResponse -router = APIRouter(prefix="/api/v1") +router = APIRouter(prefix="/api/v1", dependencies=[Depends(require_api_key)]) async def get_or_set_cached(key: str, builder): @@ -426,9 +434,9 @@ async def download_pdf_report(task_id: str): if task_row["status"] not in ["completed", "failed"]: raise HTTPException(status_code=400, detail="Task is not finished yet") + structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} try: - structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} - pdf_bytes = bytes(reporting.generate_pdf_report(dict(task_row), {"structured": structured_data})) + pdf_bytes = reporting.generate_pdf_report(dict(task_row), {"structured": structured_data}) except Exception: return _report_generation_error_response(task_id, "pdf") diff --git a/backend/secuscan/scanners/port_scanner.py b/backend/secuscan/scanners/port_scanner.py index b70bbe65..fff03720 100644 --- a/backend/secuscan/scanners/port_scanner.py +++ b/backend/secuscan/scanners/port_scanner.py @@ -26,24 +26,18 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: Runs Nmap scan and parses output into structured findings. """ self.update_progress(0.1) - - # Prepare inputs for the Nmap plugin - # Map PortScanner inputs to Nmap plugin fields + + raw_scan_type = inputs.get("scan_type", "T") + scan_type, service_detection = self._resolve_scan_type(raw_scan_type) + plugin_inputs = { "target": target, - "scan_type": inputs.get("scan_type", "-sV"), - "ports": inputs.get("ports", "top100"), - "speed": inputs.get("speed", "T4"), - "safe_mode": inputs.get("safe_mode", True) + "scan_type": scan_type, + "ports": self._resolve_ports(inputs.get("ports", "")), + "service_detection": inputs.get("service_detection", service_detection), + "os_detection": inputs.get("os_detection", False), + "safe_mode": inputs.get("safe_mode", True), } - - # Handle port shortcuts - if plugin_inputs["ports"] == "top100": - plugin_inputs["ports"] = "--top-ports 100" - elif plugin_inputs["ports"] == "top1000": - plugin_inputs["ports"] = "--top-ports 1000" - elif plugin_inputs["ports"] == "all": - plugin_inputs["ports"] = "-p-" plugin_manager = get_plugin_manager() command = plugin_manager.build_command("nmap", plugin_inputs) @@ -67,6 +61,43 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: "status": "completed" if exit_code == 0 else "failed" } + @staticmethod + def _resolve_scan_type(raw: str) -> tuple: + """Normalize raw scan type to the single-letter code expected by the nmap plugin. + + Returns (scan_type_letter, service_detection_implied) where + service_detection_implied is True when the raw value requests version probing. + """ + raw = raw.strip().lstrip("-") + # '-sV' or 'sV' implies TCP connect + service version detection + if raw.lower() in ("sv", "v"): + return "T", True + # Full nmap flag like 'sS', 'sT', 'sU' — strip the leading 's' + if raw.lower().startswith("s") and len(raw) > 1: + letter = raw[1].upper() + elif len(raw) >= 1: + letter = raw[0].upper() + else: + return "T", False + return (letter if letter in ("S", "T", "U") else "T"), False + + @staticmethod + def _resolve_ports(raw: str) -> str: + """Map convenience shorthand labels to valid numeric port specifications. + + Empty string tells the nmap template to use its --top-ports default. + Shorthand values like 'top100' and 'all' are mapped to numeric ranges + accepted by both the nmap CLI and the port-spec validator. + """ + raw = raw.strip() + if raw in ("top100", ""): + return "" # nmap template emits --top-ports 100 + if raw == "top1000": + return "1-1000" + if raw == "all": + return "1-65535" + return raw + async def _execute_command(self, command: List[str]) -> tuple: """Executes the command and returns (output, exit_code)""" import asyncio.subprocess diff --git a/backend/secuscan/vault.py b/backend/secuscan/vault.py index 8e4d8bf6..e28a1208 100644 --- a/backend/secuscan/vault.py +++ b/backend/secuscan/vault.py @@ -1,46 +1,44 @@ -"""Lightweight encrypted credential vault.""" +"""Encrypted credential vault backed by AES-256-GCM.""" from __future__ import annotations import base64 import hashlib -import hmac import os -from itertools import cycle + +from cryptography.hazmat.primitives.ciphers.aead import AESGCM class VaultCrypto: - """Symmetric encryption helper backed by a deterministic keystream. + """AES-256-GCM authenticated encryption for vault secrets. - This is intentionally lightweight for local-first usage where secret-at-rest - protection is needed without adding third-party crypto dependencies. + Each call to encrypt() generates a fresh 12-byte nonce so ciphertexts are + never reused, even for identical plaintexts. The 16-byte GCM auth tag + provides integrity — decryption raises ValueError on any tampering. """ - def __init__(self, key: bytes): - self.key = key + _NONCE_LEN = 12 - def _derive_stream_key(self, nonce: bytes) -> bytes: - return hashlib.sha256(self.key + nonce).digest() + def __init__(self, key: bytes): + # key is the raw Fernet-encoded 32-byte value from config.resolved_vault_key. + # We base64-decode it to get the 32 raw bytes that AESGCM expects. + raw = base64.urlsafe_b64decode(key) + if len(raw) != 32: + raise ValueError("Vault key must be exactly 32 raw bytes (256 bits)") + self._aesgcm = AESGCM(raw) def encrypt(self, plaintext: str) -> str: - raw = plaintext.encode("utf-8") - nonce = os.urandom(16) - stream_key = self._derive_stream_key(nonce) - ciphertext = bytes(b ^ k for b, k in zip(raw, cycle(stream_key))) - signature = hmac.new(self.key, nonce + ciphertext, hashlib.sha256).digest() - blob = nonce + signature + ciphertext + nonce = os.urandom(self._NONCE_LEN) + ciphertext = self._aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None) + blob = nonce + ciphertext return base64.urlsafe_b64encode(blob).decode("ascii") def decrypt(self, payload: str) -> str: blob = base64.urlsafe_b64decode(payload.encode("ascii")) - nonce = blob[:16] - signature = blob[16:48] - ciphertext = blob[48:] - - expected = hmac.new(self.key, nonce + ciphertext, hashlib.sha256).digest() - if not hmac.compare_digest(signature, expected): - raise ValueError("Vault payload integrity verification failed") - - stream_key = self._derive_stream_key(nonce) - raw = bytes(b ^ k for b, k in zip(ciphertext, cycle(stream_key))) + nonce = blob[: self._NONCE_LEN] + ciphertext = blob[self._NONCE_LEN :] + try: + raw = self._aesgcm.decrypt(nonce, ciphertext, None) + except Exception as exc: + raise ValueError("Vault payload integrity verification failed") from exc return raw.decode("utf-8") diff --git a/testing/backend/unit/test_plugin_parser_security.py b/testing/backend/unit/test_plugin_parser_security.py new file mode 100644 index 00000000..60879a9d --- /dev/null +++ b/testing/backend/unit/test_plugin_parser_security.py @@ -0,0 +1,327 @@ +""" +Security tests for plugin parser execution and command argument injection. + +Covers: + - Parser integrity re-verification at execution time (TOCTOU defence) + - Input schema validation that prevents CLI argument injection + - PortScanner scan-type and port normalisation +""" + +import asyncio +import json +import re +from pathlib import Path +from unittest.mock import patch + +import pytest + +from backend.secuscan.config import settings +from backend.secuscan.models import PluginField, PluginFieldType, PluginMetadata +from backend.secuscan.plugins import PluginManager +from backend.secuscan.scanners.port_scanner import PortScanner + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_manager(plugins_dir: str) -> PluginManager: + manager = PluginManager(plugins_dir) + asyncio.run(manager.load_plugins()) + return manager + + +def _make_mock_plugin(extra_fields: list | None = None) -> PluginMetadata: + """Minimal plugin with one plain STRING field for unit-testing schema validation.""" + fields = [ + PluginField( + id="custom_arg", + label="Custom Arg", + type=PluginFieldType.STRING, + required=False, + ) + ] + if extra_fields: + fields.extend(extra_fields) + return PluginMetadata( + id="mock_plugin", + name="Mock Plugin", + version="1.0.0", + description="Unit test fixture", + category="network", + engine={"type": "cli", "binary": "nmap"}, + command_template=["nmap", "{custom_arg}"], + fields=fields, + presets={}, + output={"parser": "text", "format": "text"}, + safety={"level": "safe"}, + ) + + +# --------------------------------------------------------------------------- +# Issue #202 — Parser integrity verification at execution time +# --------------------------------------------------------------------------- + + +class TestParserIntegrityAtExecTime: + """verify_parser_at_exec_time() must block execution of tampered parsers.""" + + def test_valid_checksum_passes(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + plugin = manager.get_plugin("nmap") + assert plugin is not None + plugin_dir = manager.plugins_dir / "nmap" + assert manager.verify_parser_at_exec_time(plugin, plugin_dir) + + def test_tampered_parser_is_rejected(self, tmp_path): + """Modifying parser.py after load must be caught before execution.""" + plugin_dir = tmp_path / "evil_plugin" + plugin_dir.mkdir() + + metadata = { + "id": "evil_plugin", + "name": "Evil Plugin", + "version": "1.0.0", + "description": "Test", + "category": "network", + "engine": {"type": "cli", "binary": "nmap"}, + "command_template": ["nmap", "{target}"], + "fields": [], + "presets": {}, + "output": {"parser": "custom", "format": "text"}, + "safety": {"level": "safe"}, + } + metadata_file = plugin_dir / "metadata.json" + parser_file = plugin_dir / "parser.py" + + # Write both files before computing the digest (metadata.json must exist first) + parser_file.write_text("def parse(output): return {}", encoding="utf-8") + metadata_file.write_text(json.dumps(metadata), encoding="utf-8") + + checksum = PluginManager.compute_plugin_digest(metadata_file, parser_file) + metadata["checksum"] = checksum + metadata_file.write_text(json.dumps(metadata), encoding="utf-8") + + # Load the plugin so it exists in the manager + manager = PluginManager(str(tmp_path)) + asyncio.run(manager.load_plugins()) + plugin = manager.get_plugin("evil_plugin") + assert plugin is not None + + # Tamper with the parser AFTER loading + parser_file.write_text( + "import os; os.system('id')\ndef parse(output): return {}", + encoding="utf-8", + ) + + assert not manager.verify_parser_at_exec_time(plugin, plugin_dir) + + def test_missing_checksum_blocked_when_enforcement_enabled(self, tmp_path, monkeypatch): + """No checksum + enforce_plugin_signatures=True must block execution.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", True) + + plugin_dir = tmp_path / "unsigned_plugin" + plugin_dir.mkdir() + metadata_file = plugin_dir / "metadata.json" + parser_file = plugin_dir / "parser.py" + + metadata = { + "id": "unsigned_plugin", + "name": "Unsigned", + "version": "1.0.0", + "description": "Test", + "category": "network", + "engine": {"type": "cli", "binary": "nmap"}, + "command_template": ["nmap", "{target}"], + "fields": [], + "presets": {}, + "output": {"parser": "custom", "format": "text"}, + "safety": {"level": "safe"}, + } + metadata_file.write_text(json.dumps(metadata), encoding="utf-8") + parser_file.write_text("def parse(output): return {}", encoding="utf-8") + + manager = PluginManager(str(tmp_path)) + # Skip the load-time integrity check for this test + from backend.secuscan.models import PluginMetadata + plugin = PluginMetadata(**metadata) + manager.plugins["unsigned_plugin"] = plugin + + assert not manager.verify_parser_at_exec_time(plugin, plugin_dir) + + def test_missing_checksum_allowed_when_enforcement_disabled(self, tmp_path, monkeypatch): + """No checksum + enforce_plugin_signatures=False must pass with a warning.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + + plugin_dir = tmp_path / "lax_plugin" + plugin_dir.mkdir() + (plugin_dir / "parser.py").write_text("def parse(o): return {}", encoding="utf-8") + + from backend.secuscan.models import PluginMetadata + plugin = PluginMetadata( + id="lax_plugin", + name="Lax Plugin", + version="1.0.0", + description="No checksum", + category="network", + engine={"type": "cli", "binary": "nmap"}, + command_template=["nmap", "{target}"], + fields=[], + presets={}, + output={"parser": "custom", "format": "text"}, + safety={"level": "safe"}, + checksum=None, + ) + + manager = PluginManager(str(tmp_path)) + assert manager.verify_parser_at_exec_time(plugin, plugin_dir) + + def test_digest_computation_failure_rejects_execution(self, tmp_path): + """If digest computation throws, execution must be refused.""" + from backend.secuscan.models import PluginMetadata + plugin = PluginMetadata( + id="broken", + name="Broken", + version="1.0.0", + description="x", + category="network", + engine={"type": "cli", "binary": "nmap"}, + command_template=["nmap", "{target}"], + fields=[], + presets={}, + output={"parser": "custom", "format": "text"}, + safety={"level": "safe"}, + checksum="somechecksum", + ) + + manager = PluginManager(str(tmp_path)) + # plugin_dir doesn't exist, so compute_plugin_digest will raise + missing_dir = tmp_path / "no_such_plugin" + assert not manager.verify_parser_at_exec_time(plugin, missing_dir) + + +# --------------------------------------------------------------------------- +# Issue #201 — Command argument injection via scanner inputs +# --------------------------------------------------------------------------- + + +class TestInputSchemaValidation: + """_validate_inputs_against_schema() must reject injection-prone values.""" + + def test_valid_nmap_inputs_pass(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + # Should not raise + manager.build_command("nmap", {"target": "192.168.1.1", "scan_type": "T", "ports": "80,443"}) + + def test_invalid_select_value_rejected(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + with pytest.raises(ValueError, match="not in the allowed set"): + manager.build_command("nmap", {"target": "192.168.1.1", "scan_type": "X"}) + + def test_port_injection_via_flag_prefix_rejected(self, setup_test_environment): + """ports='--script=evil.nse' would inject a flag into the nmap argv.""" + manager = _make_manager(settings.plugins_dir) + with pytest.raises(ValueError, match="port specification"): + manager.build_command("nmap", {"target": "192.168.1.1", "ports": "--script=evil.nse"}) + + def test_port_injection_via_dash_prefix_rejected(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + with pytest.raises(ValueError, match="port specification"): + manager.build_command("nmap", {"target": "192.168.1.1", "ports": "-p-"}) + + def test_valid_port_range_passes(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + cmd = manager.build_command("nmap", {"target": "192.168.1.1", "ports": "1-1000"}) + assert "1-1000" in cmd + + def test_valid_comma_ports_pass(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + cmd = manager.build_command("nmap", {"target": "192.168.1.1", "ports": "22,80,443"}) + assert "22,80,443" in cmd + + def test_scan_type_with_leading_dash_rejected(self, setup_test_environment): + """scan_type is a select field; free-text values that aren't in the options list are rejected.""" + manager = _make_manager(settings.plugins_dir) + with pytest.raises(ValueError, match="not in the allowed set"): + manager.build_command("nmap", {"target": "192.168.1.1", "scan_type": "-sS --script=evil"}) + + def test_target_field_pattern_enforced(self, setup_test_environment): + """The nmap target field has pattern ^[a-zA-Z0-9.-]+$ that blocks shell metacharacters.""" + manager = _make_manager(settings.plugins_dir) + with pytest.raises(ValueError): + manager.build_command("nmap", {"target": "192.168.1.1; id"}) + + def test_string_field_leading_dash_rejected(self, tmp_path): + """Any STRING field value starting with '-' is rejected to prevent flag injection.""" + manager = PluginManager(str(tmp_path)) + plugin = _make_mock_plugin() + manager.plugins["mock_plugin"] = plugin + with pytest.raises(ValueError, match="must not begin with"): + manager._validate_inputs_against_schema(plugin, {"custom_arg": "-v --evil-payload"}) + + def test_integer_field_type_enforced(self, setup_test_environment): + manager = _make_manager(settings.plugins_dir) + with pytest.raises(ValueError, match="expects an integer"): + manager.build_command( + "http_inspector", + {"url": "http://localhost", "timeout": "not_a_number"}, + ) + + +# --------------------------------------------------------------------------- +# PortScanner input normalisation +# --------------------------------------------------------------------------- + + +class TestPortScannerInputNormalisation: + """PortScanner must produce values that pass the nmap plugin's schema validation.""" + + @pytest.mark.parametrize("raw,expected_letter,expected_svc", [ + ("T", "T", False), + ("S", "S", False), + ("U", "U", False), + ("-sV", "T", True), # service-version detection maps to TCP connect + svc flag + ("sV", "T", True), + ("-sS", "S", False), + ("-sT", "T", False), + ("-sU", "U", False), + ("", "T", False), # empty defaults to TCP connect + ("X", "T", False), # unknown letter defaults safely + ]) + def test_resolve_scan_type(self, raw, expected_letter, expected_svc): + letter, svc = PortScanner._resolve_scan_type(raw) + assert letter == expected_letter + assert svc == expected_svc + + @pytest.mark.parametrize("raw,expected", [ + ("", ""), + ("top100", ""), + ("top1000", "1-1000"), + ("all", "1-65535"), + ("80,443", "80,443"), + ("22", "22"), + ("1-1000", "1-1000"), + ]) + def test_resolve_ports(self, raw, expected): + assert PortScanner._resolve_ports(raw) == expected + + def test_resolved_ports_pass_schema_validation(self, setup_test_environment): + """Values produced by _resolve_ports must all pass _reject_injected_args.""" + manager = _make_manager(settings.plugins_dir) + safe_cases = ["", "80,443", "1-1000", "1-65535", "22"] + for ports_value in safe_cases: + # build_command raises if validation fails; no assertion needed beyond no-raise + manager.build_command( + "nmap", + {"target": "192.168.1.1", "scan_type": "T", "ports": ports_value}, + ) + + def test_old_top_ports_shorthand_would_have_injected(self): + """Regression: the old '--top-ports 100' value fails the port validator.""" + from backend.secuscan.plugins import _PORT_SPEC_PATTERN + assert not _PORT_SPEC_PATTERN.match("--top-ports 100") + + def test_old_dash_p_shorthand_would_have_injected(self): + """Regression: the old '-p-' (all-ports flag) fails the port validator.""" + from backend.secuscan.plugins import _PORT_SPEC_PATTERN + assert not _PORT_SPEC_PATTERN.match("-p-")