diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1cd3e251..86874186 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,42 @@ permissions: contents: read jobs: + detect-changes: + runs-on: ubuntu-latest + outputs: + run_backend: ${{ steps.filter.outputs.run_backend }} + run_frontend: ${{ steps.filter.outputs.run_frontend }} + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Fetch base branch for diff + if: github.event_name == 'pull_request' + run: git fetch origin "${{ github.base_ref }}" --depth=1 + - name: Determine test selection + id: filter + run: python3 scripts/select_tests.py + + formatting-hygiene: + needs: detect-changes + if: github.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Check formatting hygiene on changed files + run: | + git fetch origin "${{ github.base_ref }}" --depth=1 + git diff --check "origin/${{ github.base_ref }}"...HEAD + backend-lint: + needs: detect-changes + if: needs.detect-changes.outputs.run_backend == 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -25,6 +60,12 @@ jobs: run: ruff check backend testing/backend backend-tests: + needs: [detect-changes, backend-lint, formatting-hygiene] + if: | + always() && + needs.detect-changes.outputs.run_backend == 'true' && + (needs.backend-lint.result == 'success' || needs.backend-lint.result == 'skipped') && + (needs.formatting-hygiene.result == 'success' || needs.formatting-hygiene.result == 'skipped') runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -41,6 +82,11 @@ jobs: run: pytest testing/backend -q frontend-checks: + needs: [detect-changes, formatting-hygiene] + if: | + always() && + needs.detect-changes.outputs.run_frontend == 'true' && + (needs.formatting-hygiene.result == 'success' || needs.formatting-hygiene.result == 'skipped') runs-on: ubuntu-latest defaults: run: @@ -66,15 +112,3 @@ jobs: run: npm run test - name: Build frontend run: npm run build - - formatting-hygiene: - if: github.event_name == 'pull_request' - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - name: Check formatting hygiene on changed files - run: | - git fetch origin "${{ github.base_ref }}" --depth=1 - git diff --check "origin/${{ github.base_ref }}"...HEAD diff --git a/.gitignore b/.gitignore index 04904306..35950339 100644 --- a/.gitignore +++ b/.gitignore @@ -102,4 +102,4 @@ backend/__pycache__/ Thumbs.db backend/data/reports/* -!backend/data/reports/.gitkeep \ No newline at end of file +!backend/data/reports/.gitkeep diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9bebba48..4e5a084d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -404,4 +404,4 @@ Never commit these auto-generated paths: If CI fails, run: ```bash git rm --cached -echo 'frontend/dist/' >> .gitignore \ No newline at end of file +echo 'frontend/dist/' >> .gitignore diff --git a/README.md b/README.md index 7d520516..7e5b1be0 100644 --- a/README.md +++ b/README.md @@ -210,4 +210,4 @@ This project is released under the [MIT License](LICENSE). - `LICENSE` is the canonical legal text for this repository. - Contributions merged into this repository are distributed under the same MIT License unless explicitly stated otherwise. -- Third-party tools, libraries, and external scanners referenced by SecuScan may have their own licenses and usage terms. Check upstream projects before redistributing bundled integrations. \ No newline at end of file +- Third-party tools, libraries, and external scanners referenced by SecuScan may have their own licenses and usage terms. Check upstream projects before redistributing bundled integrations. diff --git a/backend/Dockerfile b/backend/Dockerfile index 82cbdbe9..cd1237e6 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -10,4 +10,3 @@ COPY plugins ./plugins EXPOSE 8081 CMD ["uvicorn", "secuscan.api:app", "--host", "127.0.0.1", "--port", "8081"] - diff --git a/backend/secuscan/cache.py b/backend/secuscan/cache.py index a517b867..342da862 100644 --- a/backend/secuscan/cache.py +++ b/backend/secuscan/cache.py @@ -30,13 +30,13 @@ async def get_json(self, key: str) -> Optional[Any]: """Retrieve and parse JSON from memory, respecting TTL.""" now = time.time() expiry = self._expires.get(key) - + if expiry and now > expiry: # Clean up expired item self._data.pop(key, None) self._expires.pop(key, None) return None - + return self._data.get(key) async def set_json(self, key: str, value: Any, ttl: Optional[int] = None): diff --git a/backend/secuscan/cli.py b/backend/secuscan/cli.py index 1a41be85..236f1ded 100644 --- a/backend/secuscan/cli.py +++ b/backend/secuscan/cli.py @@ -21,7 +21,10 @@ from backend.secuscan.plugins import init_plugins, get_plugin_manager from backend.secuscan.reporting import reporting -async def run_scan(target: str, plugin_id: str, output_format: str, output_file: Optional[str] = None): + +async def run_scan( + target: str, plugin_id: str, output_format: str, output_file: Optional[str] = None +): """Initialize components and execute a scan task.""" # Ensure directories exist @@ -37,7 +40,11 @@ async def run_scan(target: str, plugin_id: str, output_format: str, output_file: # If target is "." and no plugin specified, default to a sensible one for code if target == "." and plugin_id == "nmap": # Check if we should use secret_scanner or code_analyzer instead - plugin_id = "secret_scanner" if plugin_manager.get_plugin("secret_scanner") else "code_analyzer" + plugin_id = ( + "secret_scanner" + if plugin_manager.get_plugin("secret_scanner") + else "code_analyzer" + ) print(f"[*] Detected directory target '.', defaulting to plugin: {plugin_id}") plugin = plugin_manager.get_plugin(plugin_id) @@ -88,7 +95,7 @@ async def monitor_output(): db = await get_db() task_row = await db.fetchone( "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", - (task_id,) + (task_id,), ) if not task_row: @@ -102,7 +109,9 @@ async def monitor_output(): print(f"\n[*] Scan completed successfully.") # Generate report - structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} + structured_data = ( + json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} + ) result_payload = {"structured": structured_data} report_content: str = "" @@ -132,15 +141,25 @@ async def monitor_output(): return 0 + def main(): - parser = argparse.ArgumentParser(description="SecuScan CLI - Local-First Pentesting Toolkit") + parser = argparse.ArgumentParser( + description="SecuScan CLI - Local-First Pentesting Toolkit" + ) subparsers = parser.add_subparsers(dest="command", help="Command to run") # Scan command scan_parser = subparsers.add_parser("scan", help="Run a security scan") scan_parser.add_argument("target", help="Target to scan (IP, Domain, or Path)") - scan_parser.add_argument("--plugin", default="nmap", help="Plugin ID to use (default: nmap)") - scan_parser.add_argument("--format", choices=["sarif", "json", "csv", "html", "console"], default="console", help="Output format") + scan_parser.add_argument( + "--plugin", default="nmap", help="Plugin ID to use (default: nmap)" + ) + scan_parser.add_argument( + "--format", + choices=["sarif", "json", "csv", "html", "console"], + default="console", + help="Output format", + ) scan_parser.add_argument("--output", "-o", help="Output file path") # List plugins command @@ -149,7 +168,9 @@ def main(): args = parser.parse_args() if args.command == "scan": - sys.exit(asyncio.run(run_scan(args.target, args.plugin, args.format, args.output))) + sys.exit( + asyncio.run(run_scan(args.target, args.plugin, args.format, args.output)) + ) elif args.command == "plugins": # Synchronous shortcut for listing async def list_plugins(): @@ -159,9 +180,11 @@ async def list_plugins(): print("-" * 65) for p_id, p in pm.plugins.items(): print(f"{p_id:<20} {p.name:<30} {p.category:<15}") + asyncio.run(list_plugins()) else: parser.print_help() + if __name__ == "__main__": main() diff --git a/backend/secuscan/config.py b/backend/secuscan/config.py index e05e573c..1ff8b419 100644 --- a/backend/secuscan/config.py +++ b/backend/secuscan/config.py @@ -14,26 +14,26 @@ class Settings(BaseSettings): """Application settings loaded from environment variables""" - + # Server Configuration bind_address: str = "127.0.0.1" bind_port: int = 8000 debug: bool = True - + # Primary data store database_path: str = str(PROJECT_ROOT / "data" / "secuscan.db") # Cache store (In-memory used when redis_url is None or Docker is disabled) redis_url: Optional[str] = None cache_ttl_seconds: int = 30 - + # Storage data_dir: str = str(PROJECT_ROOT / "data") raw_output_dir: str = str(PROJECT_ROOT / "data" / "raw") reports_dir: str = str(PROJECT_ROOT / "data" / "reports") plugins_dir: str = str(PROJECT_ROOT.parent / "plugins") wordlists_dir: str = str(PROJECT_ROOT / "wordlists") - + # Security safe_mode_default: bool = True require_consent: bool = True @@ -49,18 +49,30 @@ class Settings(BaseSettings): "http://localhost:4173", "http://127.0.0.1:4173", ] - cors_allowed_methods: List[str] = ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"] - cors_allowed_headers: List[str] = ["Content-Type", "Authorization", "Accept", "Origin"] + cors_allowed_methods: List[str] = [ + "GET", + "POST", + "PUT", + "PATCH", + "DELETE", + "OPTIONS", + ] + cors_allowed_headers: List[str] = [ + "Content-Type", + "Authorization", + "Accept", + "Origin", + ] cors_allow_credentials: bool = True plugin_signature_key: Optional[str] = None enforce_plugin_signatures: bool = False vault_key: Optional[str] = None - + # Rate Limiting max_concurrent_tasks: int = 3 max_tasks_per_hour: int = 50 max_requests_per_minute: int = 100 - + # Sandbox docker_enabled: bool = False sandbox_timeout: int = 600 # seconds @@ -68,26 +80,31 @@ class Settings(BaseSettings): sandbox_memory_mb: int = 512 # Task-start payload limits (tunable via env vars) - task_start_max_body_bytes: int = 64_000 # 64 KB total JSON body - task_start_max_field_length: int = 1_000 # max chars per string input value - task_start_max_array_length: int = 50 # max items in any list/multiselect input + task_start_max_body_bytes: int = 64_000 # 64 KB total JSON body + task_start_max_field_length: int = 1_000 # max chars per string input value + task_start_max_array_length: int = 50 # max items in any list/multiselect input # Logging log_level: str = "INFO" log_file: str = str(PROJECT_ROOT / "logs" / "secuscan.log") - + class Config: env_prefix = "SECUSCAN_" case_sensitive = False - @field_validator("cors_allowed_origins", "cors_allowed_methods", "cors_allowed_headers", mode="before") + @field_validator( + "cors_allowed_origins", + "cors_allowed_methods", + "cors_allowed_headers", + mode="before", + ) @classmethod def parse_csv_or_list(cls, value: Any) -> Any: """Allow comma-separated env values in addition to JSON arrays.""" if isinstance(value, str): return [item.strip() for item in value.split(",") if item.strip()] return value - + @property def base_url(self) -> str: """Full base URL for the API""" @@ -99,7 +116,7 @@ def resolved_vault_key(self) -> bytes: seed = self.vault_key or self.plugin_signature_key or "secuscan-dev-key" digest = hashlib.sha256(seed.encode("utf-8")).digest() return base64.urlsafe_b64encode(digest) - + def ensure_directories(self) -> None: """Create necessary directories if they don't exist""" for directory in [ @@ -109,11 +126,11 @@ def ensure_directories(self) -> None: Path(self.log_file).parent, ]: Path(directory).mkdir(parents=True, exist_ok=True) - + # Create gitkeep files (Path(self.raw_output_dir) / ".gitkeep").touch() (Path(self.reports_dir) / ".gitkeep").touch() # Global settings instance -settings = Settings() \ No newline at end of file +settings = Settings() diff --git a/backend/secuscan/database.py b/backend/secuscan/database.py index 8ff8775e..2ae51917 100644 --- a/backend/secuscan/database.py +++ b/backend/secuscan/database.py @@ -26,14 +26,16 @@ def __init__(self, db_path: str): def connection(self) -> aiosqlite.Connection: """Get the active database connection, raising an error if it's not connected.""" if self._connection is None: - raise RuntimeError("Database not connected. Did you forget to await connect()?") + raise RuntimeError( + "Database not connected. Did you forget to await connect()?" + ) return self._connection async def connect(self): """Establish database connection and ensure schema exists.""" # Ensure data directory exists Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) - + conn = await aiosqlite.connect(self.db_path) self._connection = conn conn.row_factory = aiosqlite.Row @@ -182,7 +184,7 @@ async def _create_schema(self): # Migration logic: ensure latest columns exist in 'tasks' table tasks_columns = await self.fetchall("PRAGMA table_info(tasks)") existing_cols = {col["name"] for col in tasks_columns} - + needed_cols = { "exit_code": "INTEGER", "structured_json": "TEXT", @@ -194,13 +196,15 @@ async def _create_schema(self): "memory_peak_mb": "REAL", "inputs_json": "TEXT NOT NULL DEFAULT '{}'", "preset": "TEXT", - "safe_mode": "BOOLEAN NOT NULL DEFAULT 1" + "safe_mode": "BOOLEAN NOT NULL DEFAULT 1", } for col_name, col_type in needed_cols.items(): if col_name not in existing_cols: try: - await self.execute(f"ALTER TABLE tasks ADD COLUMN {col_name} {col_type}") + await self.execute( + f"ALTER TABLE tasks ADD COLUMN {col_name} {col_type}" + ) print(f"Added missing column {col_name} to tasks table.") except Exception as e: print(f"Failed to add column {col_name}: {e}") diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 3b45fbbe..970ac7e8 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -30,7 +30,7 @@ MODULAR_SCANNERS = { "port_scanner": PortScanner, "web_scanner": WebScanner, - "recon_scanner": ReconScanner + "recon_scanner": ReconScanner, } logger = logging.getLogger(__name__) @@ -45,6 +45,8 @@ def extract_target(inputs: Dict[str, Any]) -> str: or inputs.get("domain") or "" ) + + class TaskExecutor: """Executes security scanning tasks in isolated environments""" @@ -74,39 +76,39 @@ async def _broadcast(self, task_id: str, event_type: str, data: Any): event = {"type": event_type, "data": data} for q in self._listeners[task_id]: await q.put(event) - + async def create_task( self, plugin_id: str, inputs: Dict[str, Any], preset: Optional[str] = None, - consent_granted: bool = False + consent_granted: bool = False, ) -> str: """ Create a new scan task. - + Args: plugin_id: Plugin identifier inputs: User input values preset: Optional preset name consent_granted: Whether user granted consent - + Returns: Task ID """ task_id = str(uuid.uuid4()) plugin_manager = get_plugin_manager() plugin = plugin_manager.get_plugin(plugin_id) - + if not plugin: raise ValueError(f"Plugin not found: {plugin_id}") - + # Apply preset if provided if preset and preset in plugin.presets: preset_values = plugin.presets[preset] # Merge preset with user inputs (user inputs take precedence) inputs = {**preset_values, **inputs} - + # Store task in database db = await get_db() await db.execute( @@ -125,21 +127,25 @@ async def create_task( preset, TaskStatus.QUEUED.value, consent_granted, - inputs.get("safe_mode", True) - ) + inputs.get("safe_mode", True), + ), ) - + # Log audit event await db.log_audit( "task_created", f"Task created for {plugin.name}", - context={"task_id": task_id, "plugin_id": plugin_id, "target": inputs.get("target")}, + context={ + "task_id": task_id, + "plugin_id": plugin_id, + "target": inputs.get("target"), + }, task_id=task_id, - plugin_id=plugin_id + plugin_id=plugin_id, ) - + return task_id - + async def mark_task_failed(self, task_id: str, reason: str) -> None: """ Mark a task as failed without running it. @@ -165,7 +171,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None: 0, reason, task_id, - ) + ), ) await db.log_audit( "task_failed", @@ -178,7 +184,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None: async def execute_task(self, task_id: str): """ Execute a task asynchronously. - + Args: task_id: Task identifier """ @@ -189,14 +195,14 @@ async def execute_task(self, task_id: str): # Update status to running await db.execute( "UPDATE tasks SET status = ?, started_at = ? WHERE id = ?", - (TaskStatus.RUNNING.value, datetime.now().isoformat(), task_id) + (TaskStatus.RUNNING.value, datetime.now().isoformat(), task_id), ) await self._invalidate_cached_views() # Get task details task_row = await db.fetchone( "SELECT plugin_id, inputs_json, safe_mode FROM tasks WHERE id = ?", - (task_id,) + (task_id,), ) if not task_row: @@ -211,18 +217,22 @@ async def execute_task(self, task_id: str): if plugin_id in MODULAR_SCANNERS: scanner_class = MODULAR_SCANNERS[plugin_id] scanner = scanner_class(task_id, db) - + logger.info(f"Executing modular scanner {plugin_id} for task {task_id}") await self._broadcast(task_id, "status", TaskStatus.RUNNING.value) - + start_time = time.time() # Run the scanner result = await scanner.run(target, inputs) duration = time.time() - start_time - + # Update task with results - final_status = TaskStatus.COMPLETED.value if result.get("status") != "failed" else TaskStatus.FAILED.value - + final_status = ( + TaskStatus.COMPLETED.value + if result.get("status") != "failed" + else TaskStatus.FAILED.value + ) + await db.execute( """ UPDATE tasks SET @@ -239,8 +249,8 @@ async def execute_task(self, task_id: str): duration, json.dumps(result), result.get("error_message"), - task_id - ) + task_id, + ), ) # Upsert findings and report using the scanner's result @@ -251,7 +261,7 @@ async def execute_task(self, task_id: str): plugin_id=plugin_id, target=target, status=final_status, - result=result + result=result, ) else: @@ -262,7 +272,7 @@ async def execute_task(self, task_id: str): raise ValueError(f"Plugin not found: {plugin_id}") # Pending records for assets removed - + command = plugin_manager.build_command(plugin_id, inputs) if not command: @@ -300,7 +310,7 @@ async def execute_task(self, task_id: str): # Save raw output raw_path = Path(settings.raw_output_dir) / f"{task_id}.txt" output = redact(output) - with open(raw_path, 'w') as f: + with open(raw_path, "w") as f: f.write(output) # Some CLI tools use non-zero exit codes for "no result" states while still @@ -331,8 +341,8 @@ async def execute_task(self, task_id: str): str(raw_path), " ".join(command), error_message, - task_id - ) + task_id, + ), ) # Upsert findings and report @@ -343,7 +353,7 @@ async def execute_task(self, task_id: str): plugin_id=plugin_id, target=target, status=final_status, - output=output + output=output, ) await self._broadcast(task_id, "status", final_status) @@ -353,9 +363,9 @@ async def execute_task(self, task_id: str): await db.log_audit( "task_completed", f"Task completed in {duration:.2f}s", - context={"task_id": task_id, "exit_code": locals().get('exit_code', 0)}, + context={"task_id": task_id, "exit_code": locals().get("exit_code", 0)}, task_id=task_id, - plugin_id=plugin_id + plugin_id=plugin_id, ) logger.info(f"Task {task_id} completed in {duration:.2f}s") @@ -366,7 +376,7 @@ async def execute_task(self, task_id: str): # Task.cancelled() returns False while the finally block is still # executing, so this is the only reliable place to write the # cancellation status to the DB. - duration = (time.time() - start_time) if 'start_time' in locals() else 0 + duration = (time.time() - start_time) if "start_time" in locals() else 0 await db.execute( """ UPDATE tasks SET @@ -381,7 +391,7 @@ async def execute_task(self, task_id: str): duration, task_id, TaskStatus.RUNNING.value, - ) + ), ) await self._broadcast(task_id, "status", TaskStatus.CANCELLED.value) await self._invalidate_cached_views() @@ -391,7 +401,7 @@ async def execute_task(self, task_id: str): logger.error(f"Task {task_id} failed: {e}", exc_info=True) # Update task as failed - duration = (time.time() - start_time) if 'start_time' in locals() else 0 + duration = (time.time() - start_time) if "start_time" in locals() else 0 await db.execute( """ UPDATE tasks SET @@ -406,8 +416,8 @@ async def execute_task(self, task_id: str): datetime.now().isoformat(), duration, str(e), - task_id - ) + task_id, + ), ) await self._broadcast(task_id, "status", TaskStatus.FAILED.value) @@ -418,19 +428,16 @@ async def execute_task(self, task_id: str): f"Task failed: {str(e)}", severity="error", context={"task_id": task_id, "error": str(e)}, - task_id=task_id + task_id=task_id, ) finally: # Always clean up: remove from the in-memory registry and # release the concurrency slot regardless of how the task ended. self.running_tasks.pop(task_id, None) await concurrent_limiter.release(task_id) - + async def _execute_command( - self, - command: list, - task_id: str, - timeout: int = 600 + self, command: list, task_id: str, timeout: int = 600 ) -> tuple: """ Execute command in subprocess and stream output. @@ -445,9 +452,7 @@ async def _execute_command( """ try: process = await asyncio.create_subprocess_exec( - *command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT + *command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) output_lines = [] @@ -456,18 +461,21 @@ async def read_stream(): stdout = process.stdout if stdout is None: return - + while not stdout.at_eof(): line = await stdout.readline() if line: - decoded_line = line.decode('utf-8', errors='replace') + decoded_line = line.decode("utf-8", errors="replace") output_lines.append(decoded_line) await self._broadcast(task_id, "output", decoded_line) try: await asyncio.wait_for(read_stream(), timeout=timeout) await process.wait() - return "".join(output_lines), process.returncode if process.returncode is not None else -1 + return ( + "".join(output_lines), + process.returncode if process.returncode is not None else -1, + ) except asyncio.TimeoutError: process.kill() @@ -476,12 +484,16 @@ async def read_stream(): except asyncio.CancelledError: # Handle task cancellation by killing the subprocess - logger.warning(f"Task {task_id} cancelled. Killing process {process.pid}") + logger.warning( + f"Task {task_id} cancelled. Killing process {process.pid}" + ) try: process.kill() await process.wait() except Exception as e: - logger.error(f"Error killing process for cancelled task {task_id}: {e}") + logger.error( + f"Error killing process for cancelled task {task_id}: {e}" + ) raise except Exception as e: @@ -500,14 +512,20 @@ def _resolve_execution_timeout(self, inputs: Dict[str, Any]) -> int: return timeout return settings.sandbox_timeout - def _classify_command_result(self, plugin, output: str, exit_code: int) -> tuple[str, Optional[str]]: + def _classify_command_result( + self, plugin, output: str, exit_code: int + ) -> tuple[str, Optional[str]]: """Map raw process exit codes into task status with plugin-specific tolerances.""" normalized_output = output.lower() - if "unknown option:" in normalized_output or "flag provided but not defined:" in normalized_output: + if ( + "unknown option:" in normalized_output + or "flag provided but not defined:" in normalized_output + ): return ( TaskStatus.FAILED.value, - output or "Tool rejected one or more generated CLI options. Check the final command and raw output for details.", + output + or "Tool rejected one or more generated CLI options. Check the final command and raw output for details.", ) if exit_code == 0: @@ -559,9 +577,11 @@ async def cancel_task(self, task_id: str) -> bool: if settings.docker_enabled: try: killer = await asyncio.create_subprocess_exec( - "docker", "kill", f"secuscan_task_{task_id}", + "docker", + "kill", + f"secuscan_task_{task_id}", stdout=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, ) await killer.communicate() except Exception as e: @@ -570,30 +590,26 @@ async def cancel_task(self, task_id: str) -> bool: db = await get_db() await db.execute( "UPDATE tasks SET status = ?, completed_at = ? WHERE id = ?", - (TaskStatus.CANCELLED.value, datetime.now().isoformat(), task_id) + (TaskStatus.CANCELLED.value, datetime.now().isoformat(), task_id), ) await self._broadcast(task_id, "status", TaskStatus.CANCELLED.value) await self._invalidate_cached_views() - await db.log_audit( - "task_cancelled", - "Task cancelled by user", - task_id=task_id - ) + await db.log_audit("task_cancelled", "Task cancelled by user", task_id=task_id) return True - + async def get_task_status(self, task_id: str) -> Optional[Dict]: """Get task status and progress""" db = await get_db() task_row = await db.fetchone( """ - SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at, + SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at, duration_seconds, exit_code, error_message, preset, inputs_json FROM tasks WHERE id = ? """, - (task_id,) + (task_id,), ) if not task_row: return None @@ -604,7 +620,7 @@ async def get_task_status(self, task_id: str) -> Optional[Dict]: if task_row["status"] == TaskStatus.QUEUED.value: queued_rows = await db.fetchall( "SELECT id FROM tasks WHERE status = ? ORDER BY created_at ASC", - (TaskStatus.QUEUED.value,) + (TaskStatus.QUEUED.value,), ) ids = [r["id"] for r in queued_rows] pending_count = len(ids) @@ -628,15 +644,24 @@ async def get_task_status(self, task_id: str) -> Optional[Dict]: "pending_count": pending_count, } - async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: str, target: str, status: str, output: str = ""): + async def _upsert_findings_and_report( + self, + db, + task_id: str, + plugin, + plugin_id: str, + target: str, + status: str, + output: str = "", + ): """Persist derived findings and report records into SQLite.""" parsed = self._parse_results(plugin, output) findings_data = parsed.get("findings", []) - + # Update task with structured results await db.execute( "UPDATE tasks SET structured_json = ? WHERE id = ?", - (json.dumps(parsed), task_id) + (json.dumps(parsed), task_id), ) # Insert findings @@ -689,10 +714,19 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: ), ) - async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]): + async def _upsert_findings_and_report_from_scanner( + self, + db, + task_id: str, + scanner: Any, + plugin_id: str, + target: str, + status: str, + result: Dict[str, Any], + ): """Persist modular scanner results into findings, and reports.""" findings_data = result.get("findings", []) - + # Insert findings for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") @@ -719,7 +753,7 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann finding.get("cvss"), finding.get("cve"), json.dumps(finding.get("metadata", {})), - ) + ), ) # Create/Update report @@ -740,7 +774,7 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann "professional" if status == TaskStatus.COMPLETED.value else "failed", "ready" if status == TaskStatus.COMPLETED.value else "failed", len(findings_data), - 2, # Professional reports are typically multi-page + 2, # Professional reports are typically multi-page ), ) @@ -748,16 +782,19 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: """Route to appropriate parser based on plugin metadata.""" parser_type = plugin.output.get("parser") parser_input = self._resolve_parser_input(plugin, output) - + # 1. Check for custom parser.py in plugin directory (Recommended) plugin_manager = get_plugin_manager() plugin_dir = plugin_manager.plugins_dir / plugin.id parser_path = plugin_dir / "parser.py" - + if parser_path.exists(): try: import importlib.util - spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) + + spec = importlib.util.spec_from_file_location( + f"parser_{plugin.id}", parser_path + ) if spec is not None: loader = spec.loader if loader is not None: @@ -766,19 +803,29 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: if hasattr(module, "parse"): logger.info(f"Using custom parser for {plugin.id}") parsed = module.parse(parser_input) - return self._normalize_parsed_result(plugin, parser_input, parsed) + return self._normalize_parsed_result( + plugin, parser_input, parsed + ) else: - logger.warning(f"Custom parser {parser_path} missing 'parse' function") + logger.warning( + f"Custom parser {parser_path} missing 'parse' function" + ) except Exception as e: logger.error(f"Error executing custom parser for {plugin.id}: {e}") # 2. Fallback to legacy built-in parsers if parser_type == "builtin_nmap": - return self._normalize_parsed_result(plugin, parser_input, self._parse_nmap_output(parser_input)) + return self._normalize_parsed_result( + plugin, parser_input, self._parse_nmap_output(parser_input) + ) elif parser_type == "builtin_http": - return self._normalize_parsed_result(plugin, parser_input, self._parse_http_output(parser_input)) - - return self._normalize_parsed_result(plugin, parser_input, {"findings": [], "raw": parser_input}) + return self._normalize_parsed_result( + plugin, parser_input, self._parse_http_output(parser_input) + ) + + return self._normalize_parsed_result( + plugin, parser_input, {"findings": [], "raw": parser_input} + ) def _resolve_parser_input(self, plugin, output: str) -> str: """Prefer report-file content when configured, fallback to command output.""" @@ -790,11 +837,15 @@ def _resolve_parser_input(self, plugin, output: str) -> str: logger.info("Using parser report file for %s: %s", plugin.id, path) return path.read_text(encoding="utf-8", errors="replace") except Exception as exc: - logger.warning("Failed to read parser report file %s: %s", path, exc) + logger.warning( + "Failed to read parser report file %s: %s", path, exc + ) return output - def _normalize_parsed_result(self, plugin, parser_input: str, parsed: Any) -> Dict[str, Any]: + def _normalize_parsed_result( + self, plugin, parser_input: str, parsed: Any + ) -> Dict[str, Any]: """ Normalize parser output shape so downstream report/asset logic always receives: { findings: List[Finding], ... }. @@ -824,7 +875,10 @@ def _normalize_parsed_result(self, plugin, parser_input: str, parsed: Any) -> Di ] # Fallback for JSON/JSONL plugin outputs where parser returns empty or unexpected data. - if not findings and str(plugin.output.get("format", "")).lower() in {"json", "jsonl"}: + if not findings and str(plugin.output.get("format", "")).lower() in { + "json", + "jsonl", + }: findings = self._parse_json_fallback_findings(plugin, parser_input) normalized["findings"] = findings @@ -851,7 +905,11 @@ def _normalize_finding(self, plugin, finding: Dict[str, Any]) -> Dict[str, Any]: } normalized_severity = severity_map.get(severity, "info") - category = finding.get("category") or finding.get("type") or str(plugin.category).title() + category = ( + finding.get("category") + or finding.get("type") + or str(plugin.category).title() + ) title = finding.get("title") or finding.get("name") or "Security Finding" description = finding.get("description") or finding.get("message") or str(title) @@ -868,7 +926,9 @@ def _normalize_finding(self, plugin, finding: Dict[str, Any]) -> Dict[str, Any]: "metadata": metadata, } - def _parse_json_fallback_findings(self, plugin, parser_input: str) -> List[Dict[str, Any]]: + def _parse_json_fallback_findings( + self, plugin, parser_input: str + ) -> List[Dict[str, Any]]: """Best-effort conversion of JSON payloads into finding entries.""" try: data = json.loads(parser_input) @@ -880,7 +940,9 @@ def _parse_json_fallback_findings(self, plugin, parser_input: str) -> List[Dict[ if isinstance(data, list): for idx, item in enumerate(data, start=1): if isinstance(item, dict): - findings.append(self._json_item_to_finding(plugin, item, f"Item {idx}")) + findings.append( + self._json_item_to_finding(plugin, item, f"Item {idx}") + ) else: findings.append( self._normalize_finding( @@ -901,7 +963,11 @@ def _parse_json_fallback_findings(self, plugin, parser_input: str) -> List[Dict[ if isinstance(data.get(list_key), list): for idx, item in enumerate(data[list_key], start=1): if isinstance(item, dict): - findings.append(self._json_item_to_finding(plugin, item, f"{list_key} #{idx}")) + findings.append( + self._json_item_to_finding( + plugin, item, f"{list_key} #{idx}" + ) + ) if findings: return findings @@ -909,7 +975,9 @@ def _parse_json_fallback_findings(self, plugin, parser_input: str) -> List[Dict[ return findings - def _json_item_to_finding(self, plugin, item: Dict[str, Any], default_title: str) -> Dict[str, Any]: + def _json_item_to_finding( + self, plugin, item: Dict[str, Any], default_title: str + ) -> Dict[str, Any]: title = ( item.get("title") or item.get("name") @@ -917,7 +985,12 @@ def _json_item_to_finding(self, plugin, item: Dict[str, Any], default_title: str or item.get("message") or default_title ) - description = item.get("description") or item.get("detail") or item.get("message") or str(item) + description = ( + item.get("description") + or item.get("detail") + or item.get("message") + or str(item) + ) severity = item.get("severity", "info") category = item.get("category", str(plugin.category).title()) return self._normalize_finding( @@ -936,7 +1009,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: findings = [] ports = [] services = [] - + # Regex for open ports: 80/tcp open http port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)") for match in port_pattern.finditer(output): @@ -944,19 +1017,25 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: port_val = int(port_str) ports.append(port_val) services.append(service) - findings.append({ - "title": f"Open Port: {port_str}/{proto} ({service})", - "category": "Network Service", - "severity": "low", - "description": f"Port {port_str} is open and running {service} service.", - "remediation": "Close unnecessary ports and use a firewall to restrict access.", - "metadata": {"port": port_str, "protocol": proto, "service": service} - }) - + findings.append( + { + "title": f"Open Port: {port_str}/{proto} ({service})", + "category": "Network Service", + "severity": "low", + "description": f"Port {port_str} is open and running {service} service.", + "remediation": "Close unnecessary ports and use a firewall to restrict access.", + "metadata": { + "port": port_str, + "protocol": proto, + "service": service, + }, + } + ) + return { "open_ports": sorted(list(set(ports))), "services": sorted(list(set(services))), - "findings": findings + "findings": findings, } def _parse_http_output(self, output: str) -> Dict[str, Any]: @@ -967,31 +1046,32 @@ def _parse_http_output(self, output: str) -> Dict[str, Any]: if server_match := re.search(r"(?i)Server:\s*(.+)", output): server = server_match[1].strip() techs.append(server) - findings.append({ - "title": f"Web Server Disclosed: {server}", - "category": "Information Disclosure", - "severity": "low", - "description": f"The web server discloses its version: {server}", - "remediation": "Disable the Server header in web server configuration.", - "metadata": {"server": server} - }) + findings.append( + { + "title": f"Web Server Disclosed: {server}", + "category": "Information Disclosure", + "severity": "low", + "description": f"The web server discloses its version: {server}", + "remediation": "Disable the Server header in web server configuration.", + "metadata": {"server": server}, + } + ) if powered_match := re.search(r"(?i)X-Powered-By:\s*(.+)", output): powered = powered_match[1].strip() techs.append(powered) - findings.append({ - "title": f"X-Powered-By Disclosed: {powered}", - "category": "Information Disclosure", - "severity": "low", - "description": f"The application discloses its technology stack: {powered}", - "remediation": "Disable the X-Powered-By header.", - "metadata": {"tech": powered} - }) + findings.append( + { + "title": f"X-Powered-By Disclosed: {powered}", + "category": "Information Disclosure", + "severity": "low", + "description": f"The application discloses its technology stack: {powered}", + "remediation": "Disable the X-Powered-By header.", + "metadata": {"tech": powered}, + } + ) - return { - "technologies": sorted(list(set(techs))), - "findings": findings - } + return {"technologies": sorted(list(set(techs))), "findings": findings} async def _invalidate_cached_views(self): """Clear cached aggregate views after write operations.""" diff --git a/backend/secuscan/main.py b/backend/secuscan/main.py index 08eb02c2..d37e5dee 100644 --- a/backend/secuscan/main.py +++ b/backend/secuscan/main.py @@ -26,8 +26,10 @@ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), - logging.FileHandler(settings.log_file) if Path(settings.log_file).parent.exists() else logging.NullHandler() - ] + logging.FileHandler(settings.log_file) + if Path(settings.log_file).parent.exists() + else logging.NullHandler(), + ], ) logger = logging.getLogger(__name__) @@ -38,29 +40,29 @@ async def lifespan(app: FastAPI): """Application lifespan manager""" # Startup logger.info("🚀 Starting SecuScan backend...") - + # Ensure directories exist settings.ensure_directories() logger.info("✓ Directories initialized") - + # Initialize database await init_db(settings.database_path) logger.info("✓ SQLite connected") await init_cache() logger.info("✓ In-memory cache initialized") - + # Load plugins await init_plugins(settings.plugins_dir) logger.info("✓ Plugins loaded") await scheduler.start() logger.info("✓ Workflow scheduler started") - + logger.info("✓ Ready to serve on %s:%d", settings.bind_address, settings.bind_port) - + yield - + # Shutdown logger.info("🛑 Shutting down SecuScan backend...") if global_db: @@ -79,22 +81,28 @@ async def lifespan(app: FastAPI): docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json", - lifespan=lifespan + lifespan=lifespan, ) + @app.get("/api/docs", include_in_schema=False) async def redirect_api_docs(): from fastapi.responses import RedirectResponse + return RedirectResponse(url="/docs") + @app.get("/api/redoc", include_in_schema=False) async def redirect_api_redoc(): from fastapi.responses import RedirectResponse + return RedirectResponse(url="/redoc") + @app.get("/api/openapi.json", include_in_schema=False) async def redirect_api_openapi(): from fastapi.responses import RedirectResponse + return RedirectResponse(url="/openapi.json") @@ -124,7 +132,7 @@ async def health_check(): """Health check endpoint""" import platform import sys - + return { "status": "operational", "version": "0.1.0-alpha", @@ -132,7 +140,7 @@ async def health_check(): "platform": platform.system(), "python_version": sys.version.split()[0], "docker_available": shutil.which("docker") is not None, - } + }, } @@ -145,15 +153,16 @@ async def root(): "version": "0.1.0-alpha", "status": "under development", "api_docs": f"{settings.base_url}/api/docs" if settings.debug else None, - "legal_notice": "For authorized testing only. Unauthorized scanning may be illegal." + "legal_notice": "For authorized testing only. Unauthorized scanning may be illegal.", } def main(): """Main entry point""" import uvicorn - - logger.info(""" + + logger.info( + """ ╔═══════════════════════════════════════════════════════╗ ║ ║ ║ SecuScan v0.1.0-alpha ║ @@ -162,14 +171,15 @@ def main(): ║ âš ī¸ For authorized testing only ║ ║ ║ ╚═══════════════════════════════════════════════════════╝ - """) - + """ + ) + uvicorn.run( "backend.secuscan.main:app", host=settings.bind_address, port=settings.bind_port, reload=settings.debug, - log_level=settings.log_level.lower() + log_level=settings.log_level.lower(), ) diff --git a/backend/secuscan/models.py b/backend/secuscan/models.py index 264363e5..15d22414 100644 --- a/backend/secuscan/models.py +++ b/backend/secuscan/models.py @@ -10,6 +10,7 @@ class SafetyLevel(str, Enum): """Plugin safety level classification""" + SAFE = "safe" INTRUSIVE = "intrusive" EXPLOIT = "exploit" @@ -17,6 +18,7 @@ class SafetyLevel(str, Enum): class TaskStatus(str, Enum): """Task execution status""" + QUEUED = "queued" RUNNING = "running" COMPLETED = "completed" @@ -26,6 +28,7 @@ class TaskStatus(str, Enum): class PluginFieldType(str, Enum): """Plugin field input types""" + STRING = "string" TEXT = "text" INTEGER = "integer" @@ -38,6 +41,7 @@ class PluginFieldType(str, Enum): class PluginField(BaseModel): """Plugin input field definition""" + id: str label: str type: PluginFieldType @@ -51,6 +55,7 @@ class PluginField(BaseModel): class PluginMetadata(BaseModel): """Plugin metadata schema""" + id: str name: str version: str @@ -60,12 +65,12 @@ class PluginMetadata(BaseModel): author: Optional[Dict[str, str]] = None license: Optional[str] = "MIT" icon: Optional[str] = "🔧" - + engine: Dict[str, str] command_template: List[str] fields: List[PluginField] presets: Dict[str, Dict[str, Any]] - + output: Dict[str, Any] safety: Dict[str, Any] learning: Optional[Dict[str, Any]] = None @@ -78,6 +83,7 @@ class PluginMetadata(BaseModel): class TaskCreateRequest(BaseModel): """Request to create a new task""" + plugin_id: str preset: Optional[str] = None inputs: Dict[str, Any] @@ -86,6 +92,7 @@ class TaskCreateRequest(BaseModel): class TaskResponse(BaseModel): """Task information response""" + task_id: str plugin_id: str tool: str @@ -103,6 +110,7 @@ class TaskResponse(BaseModel): class Finding(BaseModel): """Structured security finding""" + id: Optional[str] = None title: str category: str @@ -119,6 +127,7 @@ class Finding(BaseModel): class TaskResult(BaseModel): """Task execution result""" + task_id: str plugin_id: str tool: str @@ -126,14 +135,14 @@ class TaskResult(BaseModel): timestamp: datetime duration_seconds: Optional[float] status: TaskStatus - + summary: List[str] = [] severity_counts: Dict[str, int] = Field(default_factory=dict) findings: List[Finding] = Field(default_factory=list) structured: Dict[str, Any] = Field(default_factory=dict) raw_output_path: Optional[str] = None raw_output_excerpt: Optional[str] = None - + errors: List[Dict[str, Any]] = [] error_message: Optional[str] = None exit_code: Optional[int] = None @@ -142,6 +151,7 @@ class TaskResult(BaseModel): class HealthResponse(BaseModel): """Health check response""" + status: str version: str uptime_seconds: Optional[int] = None @@ -151,12 +161,14 @@ class HealthResponse(BaseModel): class PluginListResponse(BaseModel): """List of available plugins""" + plugins: List[Dict[str, Any]] total: int class ErrorResponse(BaseModel): """Error response""" + error: str message: str field: Optional[str] = None diff --git a/backend/secuscan/plugin_validator.py b/backend/secuscan/plugin_validator.py index 7c3ac455..12f552d1 100644 --- a/backend/secuscan/plugin_validator.py +++ b/backend/secuscan/plugin_validator.py @@ -20,7 +20,16 @@ VALID_ENGINE_TYPES = {"cli", "python", "docker"} VALID_SAFETY_LEVELS = {"safe", "intrusive", "exploit"} -VALID_FIELD_TYPES = {"string","integer","text", "number", "boolean", "select", "multiselect", "textarea"} +VALID_FIELD_TYPES = { + "string", + "integer", + "text", + "number", + "boolean", + "select", + "multiselect", + "textarea", +} VALID_PARSER_TYPES = {"json", "text", "custom", "none"} REQUIRED_TOP_LEVEL_FIELDS = [ @@ -268,7 +277,10 @@ def _check_validation_block(self, data: dict, result: ValidationResult) -> None: def _check_checksum(self, data: dict, result: ValidationResult) -> None: checksum = data.get("checksum") if not checksum: - result.add("checksum", "Checksum is missing — run: python scripts/refresh_plugin_checksum.py --plugin ") + result.add( + "checksum", + "Checksum is missing — run: python scripts/refresh_plugin_checksum.py --plugin ", + ) return if not isinstance(checksum, str) or len(checksum) != 64: @@ -337,4 +349,4 @@ def validate_all_plugins(plugins_dir: Path) -> list: def validate_one_plugin(plugin_dir: Path) -> ValidationResult: """Validate a single plugin directory.""" - return PluginMetadataValidator(plugin_dir).validate() \ No newline at end of file + return PluginMetadataValidator(plugin_dir).validate() diff --git a/backend/secuscan/plugins.py b/backend/secuscan/plugins.py index d224bd4a..615e5ee7 100644 --- a/backend/secuscan/plugins.py +++ b/backend/secuscan/plugins.py @@ -56,7 +56,9 @@ async def load_plugins(self) -> int: if await self._validate_plugin(plugin_meta, plugin_dir): self.plugins[plugin_meta.id] = plugin_meta loaded += 1 - logger.info(f"✓ Loaded plugin: {plugin_meta.name} v{plugin_meta.version}") + logger.info( + f"✓ Loaded plugin: {plugin_meta.name} v{plugin_meta.version}" + ) else: logger.error(f"✗ Failed to validate plugin: {plugin_meta.id}") @@ -68,7 +70,7 @@ async def load_plugins(self) -> int: async def _load_plugin_metadata(self, metadata_file: Path) -> PluginMetadata: """Load and parse plugin metadata JSON""" - with open(metadata_file, 'r') as f: + with open(metadata_file, "r") as f: data = json.load(f) return PluginMetadata(**data) @@ -117,15 +119,24 @@ async def _validate_plugin(self, plugin: PluginMetadata, plugin_dir: Path) -> bo return True - def _verify_plugin_integrity(self, plugin: PluginMetadata, plugin_dir: Path) -> bool: + def _verify_plugin_integrity( + self, plugin: PluginMetadata, plugin_dir: Path + ) -> bool: """Verify plugin checksum/signature when available.""" metadata_file = plugin_dir / "metadata.json" parser_file = plugin_dir / "parser.py" has_checksum = bool(plugin.checksum) has_signature = bool(plugin.signature) - if not has_checksum and not has_signature and settings.enforce_plugin_signatures: - logger.error("Plugin %s missing checksum/signature while enforcement is enabled", plugin.id) + if ( + not has_checksum + and not has_signature + and settings.enforce_plugin_signatures + ): + logger.error( + "Plugin %s missing checksum/signature while enforcement is enabled", + plugin.id, + ) return False try: @@ -141,9 +152,15 @@ def _verify_plugin_integrity(self, plugin: PluginMetadata, plugin_dir: Path) -> if has_signature: if not settings.plugin_signature_key: if settings.enforce_plugin_signatures: - logger.error("SECUSCAN_PLUGIN_SIGNATURE_KEY required for verifying %s", plugin.id) + logger.error( + "SECUSCAN_PLUGIN_SIGNATURE_KEY required for verifying %s", + plugin.id, + ) return False - logger.warning("Skipping signature verification for %s: key not configured", plugin.id) + logger.warning( + "Skipping signature verification for %s: key not configured", + plugin.id, + ) else: expected_sig = hmac.new( settings.plugin_signature_key.encode("utf-8"), @@ -171,7 +188,9 @@ def compute_plugin_digest(metadata_file: Path, parser_file: Path) -> str: parser_bytes_normalized = parser_bytes.replace(b"\r\n", b"\n") parser_digest = hashlib.sha256(parser_bytes_normalized).hexdigest() - return hashlib.sha256(f"{metadata_digest}:{parser_digest}".encode("utf-8")).hexdigest() + return hashlib.sha256( + f"{metadata_digest}:{parser_digest}".encode("utf-8") + ).hexdigest() def get_plugin(self, plugin_id: str) -> Optional[PluginMetadata]: """Get plugin by ID""" @@ -191,12 +210,16 @@ def list_plugins(self) -> List[Dict]: "safety_level": plugin.safety.get("level"), "enabled": True, "icon": plugin.icon, - "requires_consent": bool(plugin.safety.get("requires_consent", False)), + "requires_consent": bool( + plugin.safety.get("requires_consent", False) + ), "consent_message": plugin.safety.get("consent_message"), "availability": { "runnable": len(missing_binaries) == 0, "missing_binaries": missing_binaries, - "status": "available" if len(missing_binaries) == 0 else "unavailable", + "status": "available" + if len(missing_binaries) == 0 + else "unavailable", "guidance": ( None if len(missing_binaries) == 0 @@ -237,7 +260,7 @@ def get_plugin_schema(self, plugin_id: str) -> Optional[Dict]: "description": plugin.description, "fields": [f.model_dump() for f in plugin.fields], "presets": plugin.presets, - "safety": plugin.safety + "safety": plugin.safety, } else: return None @@ -258,12 +281,16 @@ def _interpolate(self, token: str, inputs: Dict) -> Optional[str]: if value is None or value == "": return None - placeholder = "{" + var_name + (f":{default_value}" if default_value else "") + "}" + placeholder = ( + "{" + var_name + (f":{default_value}" if default_value else "") + "}" + ) rendered = rendered.replace(placeholder, str(value)) return rendered - def _with_field_defaults(self, plugin: PluginMetadata, inputs: Dict[str, Any]) -> Dict[str, Any]: + def _with_field_defaults( + self, plugin: PluginMetadata, inputs: Dict[str, Any] + ) -> Dict[str, Any]: """Fill omitted inputs from plugin field defaults.""" normalized = dict(inputs) for field in plugin.fields: @@ -301,7 +328,9 @@ def _resolve_wordlist_path(self, value: str) -> str: elif "discovery/web-content/common.txt" in lowered: fallback_candidates.insert(0, wordlists_dir / "common.txt") elif "discovery/dns/subdomains-top1million-110000.txt" in lowered: - fallback_candidates.insert(0, wordlists_dir / "subdomains-top1million-110000.txt") + fallback_candidates.insert( + 0, wordlists_dir / "subdomains-top1million-110000.txt" + ) for fallback in fallback_candidates: if fallback.exists(): @@ -309,7 +338,9 @@ def _resolve_wordlist_path(self, value: str) -> str: return value - def _normalize_inputs(self, plugin: PluginMetadata, inputs: Dict[str, Any]) -> Dict[str, Any]: + def _normalize_inputs( + self, plugin: PluginMetadata, inputs: Dict[str, Any] + ) -> Dict[str, Any]: """Normalize plugin inputs before command rendering.""" normalized = self._with_field_defaults(plugin, inputs) wordlist_value = normalized.get("wordlist") @@ -348,7 +379,7 @@ def build_command(self, plugin_id: str, inputs: Dict) -> Optional[List[str]]: try: else_idx = parts.index("else") then_parts = parts[3:else_idx] - else_parts = parts[else_idx+1:] + else_parts = parts[else_idx + 1 :] except ValueError: then_parts = parts[3:] else_parts = [] diff --git a/backend/secuscan/ratelimit.py b/backend/secuscan/ratelimit.py index 464e4ae4..5d721421 100644 --- a/backend/secuscan/ratelimit.py +++ b/backend/secuscan/ratelimit.py @@ -10,45 +10,45 @@ class RateLimiter: """Rate limiter for controlling task execution frequency""" - + def __init__(self): self.task_history: Dict[str, List[datetime]] = defaultdict(list) self.lock = asyncio.Lock() - + async def can_execute( - self, - plugin_id: str, - max_per_hour: int = 50 + self, plugin_id: str, max_per_hour: int = 50 ) -> Tuple[bool, str]: """ Check if a task can be executed based on rate limits. - + Args: plugin_id: Plugin identifier max_per_hour: Maximum tasks per hour for this plugin - + Returns: Tuple of (allowed, error_message) """ async with self.lock: now = datetime.now() hour_ago = now - timedelta(hours=1) - + # Clean old entries self.task_history[plugin_id] = [ - ts for ts in self.task_history[plugin_id] - if ts > hour_ago + ts for ts in self.task_history[plugin_id] if ts > hour_ago ] - + recent_count = len(self.task_history[plugin_id]) - + if recent_count >= max_per_hour: - return False, f"Rate limit exceeded: {recent_count}/{max_per_hour} per hour" - + return ( + False, + f"Rate limit exceeded: {recent_count}/{max_per_hour} per hour", + ) + # Record this execution self.task_history[plugin_id].append(now) return True, "" - + async def reset(self, plugin_id: str = None): """Reset rate limits for a plugin or all plugins""" async with self.lock: @@ -60,35 +60,38 @@ async def reset(self, plugin_id: str = None): class ConcurrentTaskLimiter: """Limits concurrent task execution""" - + def __init__(self, max_concurrent: int = 3): self.max_concurrent = max_concurrent self.running_tasks: List[str] = [] self.lock = asyncio.Lock() - + async def acquire(self, task_id: str) -> Tuple[bool, str]: """ Try to acquire a slot for task execution. - + Args: task_id: Task identifier - + Returns: Tuple of (acquired, error_message) """ async with self.lock: if len(self.running_tasks) >= self.max_concurrent: - return False, f"Maximum concurrent tasks ({self.max_concurrent}) reached" - + return ( + False, + f"Maximum concurrent tasks ({self.max_concurrent}) reached", + ) + self.running_tasks.append(task_id) return True, "" - + async def release(self, task_id: str): """Release a task slot""" async with self.lock: if task_id in self.running_tasks: self.running_tasks.remove(task_id) - + async def get_available_slots(self) -> int: """Get number of available execution slots""" async with self.lock: diff --git a/backend/secuscan/redaction.py b/backend/secuscan/redaction.py index 35ed1091..7c583418 100644 --- a/backend/secuscan/redaction.py +++ b/backend/secuscan/redaction.py @@ -104,8 +104,7 @@ ( "password", re.compile( - r"((?:password|passwd|pass|pwd)\s*[=:\"'\s]{1,4})" - r"([^\s\"'&;,]{6,})", + r"((?:password|passwd|pass|pwd)\s*[=:\"'\s]{1,4})" r"([^\s\"'&;,]{6,})", re.IGNORECASE, ), ), @@ -126,8 +125,7 @@ ( "vcs_token", re.compile( - r"(glpat-[A-Za-z0-9_\-]{20,}" - r"|gh[pousr]_[A-Za-z0-9]{36,})", + r"(glpat-[A-Za-z0-9_\-]{20,}" r"|gh[pousr]_[A-Za-z0-9]{36,})", re.IGNORECASE, ), ), @@ -145,8 +143,7 @@ ( "hex_secret", re.compile( - r"((?:token|secret|key|hash|salt)\s*[=:\"'\s]{1,4})" - r"([0-9a-fA-F]{32,})", + r"((?:token|secret|key|hash|salt)\s*[=:\"'\s]{1,4})" r"([0-9a-fA-F]{32,})", re.IGNORECASE, ), ), @@ -215,9 +212,7 @@ def _redact_value(value: Any) -> Any: return value -def _apply_pattern( - name: str, pattern: re.Pattern[str], text: str -) -> tuple[str, int]: +def _apply_pattern(name: str, pattern: re.Pattern[str], text: str) -> tuple[str, int]: """ Apply a single compiled pattern to *text*. @@ -275,4 +270,4 @@ def _replace_fallback(m: re.Match[str]) -> str: # pragma: no cover count += 1 return REDACTED - return pattern.sub(_replace_fallback, text), count \ No newline at end of file + return pattern.sub(_replace_fallback, text), count diff --git a/backend/secuscan/reporting.py b/backend/secuscan/reporting.py index fb2e8987..4782faf4 100644 --- a/backend/secuscan/reporting.py +++ b/backend/secuscan/reporting.py @@ -31,11 +31,13 @@ class ReportGenerator: @staticmethod def _hex_to_rgb(value: str) -> tuple[int, int, int]: value = value.strip("#") - return tuple(int(value[index:index + 2], 16) for index in (0, 2, 4)) + return tuple(int(value[index : index + 2], 16) for index in (0, 2, 4)) @staticmethod @lru_cache(maxsize=32) - def _icon_data_uri(name: str, background: str = "1e3a5f", foreground: str = "ffffff") -> str: + def _icon_data_uri( + name: str, background: str = "1e3a5f", foreground: str = "ffffff" + ) -> str: """Return a tiny embedded PNG icon that works in both HTML and xhtml2pdf.""" bg = ReportGenerator._hex_to_rgb(background) fg = ReportGenerator._hex_to_rgb(foreground) @@ -43,7 +45,11 @@ def _icon_data_uri(name: str, background: str = "1e3a5f", foreground: str = "fff draw = ImageDraw.Draw(image) if name == "shield": - draw.line([(24, 8), (36, 13), (34, 28), (24, 39), (14, 28), (12, 13), (24, 8)], fill=fg, width=3) + draw.line( + [(24, 8), (36, 13), (34, 28), (24, 39), (14, 28), (12, 13), (24, 8)], + fill=fg, + width=3, + ) draw.line([(19, 24), (23, 28), (30, 19)], fill=fg, width=3) elif name == "findings": draw.rectangle((12, 11, 36, 37), outline=fg, width=3) @@ -113,14 +119,22 @@ def _normalize_finding(cls, finding: Any) -> Dict[str, Any]: "category": cls._clean_text(finding.get("category")) or "General", "severity": cls._clean_text(finding.get("severity") or "info").upper(), "target": cls._clean_text(finding.get("target")), - "description": redact(cls._clean_text(finding.get("description")) or "No description was provided."), + "description": redact( + cls._clean_text(finding.get("description")) + or "No description was provided." + ), "remediation": redact(cls._clean_text(finding.get("remediation"))), "proof": redact(cls._clean_text(finding.get("proof"))), "cve": cls._clean_text(finding.get("cve")), "cwe": cls._clean_text(finding.get("cwe")), "cvss": finding.get("cvss"), "discovered_at": cls._clean_text(finding.get("discovered_at")), - "metadata": redact_dict({cls._clean_text(key): cls._clean_text(val) for key, val in metadata.items()}), + "metadata": redact_dict( + { + cls._clean_text(key): cls._clean_text(val) + for key, val in metadata.items() + } + ), } if normalized["severity"] not in cls.SEVERITY_COLORS: normalized["severity"] = "INFO" @@ -155,7 +169,9 @@ def _format_input_value(cls, value: Any) -> str: if value is False: return "OFF" if isinstance(value, list): - return ", ".join(cls._clean_text(item) for item in value if cls._clean_text(item)) + return ", ".join( + cls._clean_text(item) for item in value if cls._clean_text(item) + ) if isinstance(value, dict): return json.dumps(value, sort_keys=True) return cls._clean_text(value) @@ -163,8 +179,14 @@ def _format_input_value(cls, value: Any) -> str: @classmethod def _build_scan_parameters(cls, task: Dict[str, Any]) -> List[Dict[str, str]]: parameters = [ - {"label": "Target", "value": cls._clean_text(task.get("target")) or "Unknown"}, - {"label": "Plugin", "value": cls._clean_text(task.get("plugin_id")) or "Unknown"}, + { + "label": "Target", + "value": cls._clean_text(task.get("target")) or "Unknown", + }, + { + "label": "Plugin", + "value": cls._clean_text(task.get("plugin_id")) or "Unknown", + }, ] preset = cls._clean_text(task.get("preset")) @@ -192,11 +214,15 @@ def _build_summary_lines( task: Dict[str, Any], ) -> List[str]: total_findings = len(findings) - critical_high = severity_counts.get("CRITICAL", 0) + severity_counts.get("HIGH", 0) + critical_high = severity_counts.get("CRITICAL", 0) + severity_counts.get( + "HIGH", 0 + ) summary: List[str] = [] if total_findings == 0: - summary.append("No structured findings were recorded for this assessment run.") + summary.append( + "No structured findings were recorded for this assessment run." + ) elif critical_high > 0: summary.append( f"The assessment identified {total_findings} findings, including " @@ -207,44 +233,64 @@ def _build_summary_lines( f"The assessment identified {total_findings} findings with no critical or high severity items." ) - tool_name = cls._clean_text(task.get("tool_name")) or cls._clean_text(task.get("plugin_id")) or "scan engine" + tool_name = ( + cls._clean_text(task.get("tool_name")) + or cls._clean_text(task.get("plugin_id")) + or "scan engine" + ) summary.append(f"Scan execution was performed with {tool_name}.") open_ports = structured.get("open_ports") if isinstance(open_ports, list) and open_ports: - summary.append(f"Observed {len(open_ports)} exposed network ports during this run.") + summary.append( + f"Observed {len(open_ports)} exposed network ports during this run." + ) technologies = structured.get("technologies") if isinstance(technologies, list) and technologies: - summary.append(f"Detected {len(technologies)} technology fingerprints in the target surface.") + summary.append( + f"Detected {len(technologies)} technology fingerprints in the target surface." + ) rows = structured.get("rows") if isinstance(rows, list) and rows: - summary.append(f"Structured output included {len(rows)} tabular result rows for analyst review.") + summary.append( + f"Structured output included {len(rows)} tabular result rows for analyst review." + ) return summary @classmethod - def _build_report_payload(cls, task: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]: + def _build_report_payload( + cls, task: Dict[str, Any], result: Dict[str, Any] + ) -> Dict[str, Any]: structured = result.get("structured") if not isinstance(structured, dict): structured = result if isinstance(result, dict) else {} raw_findings = result.get("findings") if not isinstance(raw_findings, list): - raw_findings = structured.get("findings", []) if isinstance(structured, dict) else [] + raw_findings = ( + structured.get("findings", []) if isinstance(structured, dict) else [] + ) findings = [cls._normalize_finding(item) for item in raw_findings] severity_counts = {severity: 0 for severity in cls.SEVERITY_ORDER} for finding in findings: - severity_counts[finding["severity"]] = severity_counts.get(finding["severity"], 0) + 1 + severity_counts[finding["severity"]] = ( + severity_counts.get(finding["severity"], 0) + 1 + ) raw_summary = result.get("summary") if isinstance(raw_summary, list) and raw_summary: - summary = [cls._clean_text(item) for item in raw_summary if cls._clean_text(item)] + summary = [ + cls._clean_text(item) for item in raw_summary if cls._clean_text(item) + ] else: - summary = cls._build_summary_lines(findings, severity_counts, structured, task) + summary = cls._build_summary_lines( + findings, severity_counts, structured, task + ) rows = structured.get("rows") if not isinstance(rows, list): @@ -256,7 +302,9 @@ def _build_report_payload(cls, task: Dict[str, Any], result: Dict[str, Any]) -> return { "task_id": cls._clean_text(task.get("id")), - "tool_name": cls._clean_text(task.get("tool_name")) or cls._clean_text(task.get("plugin_id")) or "Unknown tool", + "tool_name": cls._clean_text(task.get("tool_name")) + or cls._clean_text(task.get("plugin_id")) + or "Unknown tool", "target": cls._clean_text(task.get("target")) or "Unknown target", "status": cls._clean_text(task.get("status")) or "unknown", "created_at": cls._clean_text(task.get("created_at")), @@ -278,13 +326,17 @@ def _format_timestamp(value: str) -> str: return "Unknown" for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"): try: - return datetime.strptime(value.replace("Z", ""), fmt).strftime("%b %d, %Y %H:%M") + return datetime.strptime(value.replace("Z", ""), fmt).strftime( + "%b %d, %Y %H:%M" + ) except ValueError: continue return value @classmethod - def _generate_pdf_html_report(cls, task: Dict[str, Any], result: Dict[str, Any]) -> str: + def _generate_pdf_html_report( + cls, task: Dict[str, Any], result: Dict[str, Any] + ) -> str: """Generate conservative HTML/CSS that xhtml2pdf can paginate reliably.""" payload = cls._build_report_payload(task, result) findings = payload["findings"] @@ -988,7 +1040,7 @@ def generate_sarif_report(cls, task: Dict[str, Any], result: Dict[str, Any]) -> "HIGH": "error", "MEDIUM": "warning", "LOW": "note", - "INFO": "note" + "INFO": "note", } rules = [] @@ -1030,22 +1082,26 @@ def generate_sarif_report(cls, task: Dict[str, Any], result: Dict[str, Any]) -> if rule_id not in rule_indices: rule_indices[rule_id] = len(rules) - rules.append({ - "id": rule_id, - "name": finding.get("title", "Security Finding"), - "shortDescription": { - "text": finding.get("title", "Security Finding") - }, - "fullDescription": { - "text": finding.get("description", "No detailed description available.") - }, - "help": { - "text": finding.get("remediation", "No remediation provided.") - }, - "properties": { - "precision": "high" + rules.append( + { + "id": rule_id, + "name": finding.get("title", "Security Finding"), + "shortDescription": { + "text": finding.get("title", "Security Finding") + }, + "fullDescription": { + "text": finding.get( + "description", "No detailed description available." + ) + }, + "help": { + "text": finding.get( + "remediation", "No remediation provided." + ) + }, + "properties": {"precision": "high"}, } - }) + ) sarif_result = { "ruleId": rule_id, @@ -1054,7 +1110,7 @@ def generate_sarif_report(cls, task: Dict[str, Any], result: Dict[str, Any]) -> "text": finding.get("description", "Security finding detected") }, "level": severity_map.get(finding["severity"], "note"), - "locations": [] + "locations": [], } # Attempt to extract location if available @@ -1063,19 +1119,15 @@ def generate_sarif_report(cls, task: Dict[str, Any], result: Dict[str, Any]) -> if target: is_url = "://" in target or target.startswith(("http://", "https://")) - location = { - "physicalLocation": { - "artifactLocation": { - "uri": target - } - } - } + location = {"physicalLocation": {"artifactLocation": {"uri": target}}} # If target has a line number like file.py:123 and is NOT a web URL if not is_url and ":" in target: parts = target.split(":") if parts[-1].isdigit(): - location["physicalLocation"]["artifactLocation"]["uri"] = ":".join(parts[:-1]) + location["physicalLocation"]["artifactLocation"][ + "uri" + ] = ":".join(parts[:-1]) location["physicalLocation"]["region"] = { "startLine": int(parts[-1]) } @@ -1094,12 +1146,12 @@ def generate_sarif_report(cls, task: Dict[str, Any], result: Dict[str, Any]) -> "name": tool_name, "version": "1.0.0", "informationUri": "https://github.com/utksh1/SecuScan", - "rules": rules + "rules": rules, } }, - "results": results + "results": results, } - ] + ], } return json.dumps(sarif_output, indent=2) diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index f1d53063..3eaf7ad2 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -15,6 +15,7 @@ from pathlib import Path from urllib.parse import urlparse + def parse_json_fields(rows: List[Dict], fields: List[str]) -> List[Dict]: """Helper to parse stringified JSON fields from SQLite.""" parsed = [] @@ -47,7 +48,9 @@ def _slugify_filename_part(value: str, fallback: str) -> str: def build_report_filename(task: Dict[str, Any], extension: str) -> str: - tool = _slugify_filename_part(str(task.get("tool_name") or task.get("plugin_id") or "scan"), "scan") + tool = _slugify_filename_part( + str(task.get("tool_name") or task.get("plugin_id") or "scan"), "scan" + ) raw_target = str(task.get("target") or "") parsed = urlparse(raw_target if "://" in raw_target else f"//{raw_target}") @@ -60,12 +63,16 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: return f"secuscan_{tool}_{target}_{date_part}.{extension}" + logger = logging.getLogger(__name__) from .cache import get_cache from .models import ( - TaskCreateRequest, TaskResponse, TaskResult, - PluginListResponse, ErrorResponse + TaskCreateRequest, + TaskResponse, + TaskResult, + PluginListResponse, + ErrorResponse, ) from .config import settings from .database import get_db @@ -102,7 +109,9 @@ async def invalidate_view_cache(): def _report_generation_error_response(task_id: str, report_format: str) -> JSONResponse: - logger.exception("Report generation failed for task_id=%s format=%s", task_id, report_format) + logger.exception( + "Report generation failed for task_id=%s format=%s", task_id, report_format + ) return JSONResponse( status_code=500, content={ @@ -132,10 +141,8 @@ async def list_plugins(): plugin_manager = await get_plugin_manager_for_request() plugins = plugin_manager.list_plugins() - return PluginListResponse( - plugins=plugins, - total=len(plugins) - ) + return PluginListResponse(plugins=plugins, total=len(plugins)) + @router.get("/plugins/summary") async def get_plugins_summary(): @@ -152,9 +159,7 @@ async def get_plugins_summary(): for plugin in plugins: category = getattr(plugin, "category", "unknown") - category_counts[category] = ( - category_counts.get(category, 0) + 1 - ) + category_counts[category] = category_counts.get(category, 0) + 1 availability = plugin.get("availability", {}) runnable = availability.get("runnable", False) @@ -167,9 +172,10 @@ async def get_plugins_summary(): "total_plugins": total_plugins, "runnable_count": runnable_count, "unavailable_count": unavailable_count, - "category_counts": dict(sorted(category_counts.items())) + "category_counts": dict(sorted(category_counts.items())), } + @router.get("/plugin/{plugin_id}/schema") async def get_plugin_schema(plugin_id: str): """Get plugin schema for UI generation""" @@ -210,7 +216,7 @@ async def start_task( logger.warning(f"Task start failed: Consent not granted. Request: {request}") raise HTTPException( status_code=400, - detail="Consent required. You must acknowledge the legal notice." + detail="Consent required. You must acknowledge the legal notice.", ) # Get plugin @@ -219,24 +225,32 @@ async def start_task( if not plugin: logger.warning(f"Task start failed: Plugin not found: {request.plugin_id}") - raise HTTPException(status_code=404, detail=f"Plugin not found: {request.plugin_id}") + raise HTTPException( + status_code=404, detail=f"Plugin not found: {request.plugin_id}" + ) if target := request.inputs.get("target"): safe_mode = request.inputs.get("safe_mode", settings.safe_mode_default) target_str = str(target) - should_validate_target = plugin.category != "code" and not is_filesystem_target(target_str) + should_validate_target = plugin.category != "code" and not is_filesystem_target( + target_str + ) if should_validate_target: is_valid, error_msg = validate_target(target_str, safe_mode) if not is_valid: - logger.warning(f"Task start failed: Target validation failed for '{target}': {error_msg}") + logger.warning( + f"Task start failed: Target validation failed for '{target}': {error_msg}" + ) raise HTTPException(status_code=400, detail=error_msg) # Check rate limits can_execute, error_msg = await rate_limiter.can_execute( request.plugin_id, - plugin.safety.get("rate_limit", {}).get("max_per_hour", settings.max_tasks_per_hour) + plugin.safety.get("rate_limit", {}).get( + "max_per_hour", settings.max_tasks_per_hour + ), ) if not can_execute: @@ -245,10 +259,7 @@ async def start_task( # Create task record first so we have a real task_id for the limiter try: task_id = await executor.create_task( - request.plugin_id, - request.inputs, - request.preset, - request.consent_granted + request.plugin_id, request.inputs, request.preset, request.consent_granted ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e @@ -259,7 +270,9 @@ async def start_task( can_acquire, error_msg = await concurrent_limiter.acquire(task_id) if not can_acquire: # Roll back: mark the DB row failed so it isn't left orphaned - await executor.mark_task_failed(task_id, reason="Concurrency limit reached; task was not started") + await executor.mark_task_failed( + task_id, reason="Concurrency limit reached; task was not started" + ) raise HTTPException(status_code=503, detail=error_msg) # Slot is held — schedule execution. @@ -271,9 +284,10 @@ async def start_task( "task_id": task_id, "status": "queued", "created_at": "now", - "stream_url": f"/api/v1/task/{task_id}/stream" + "stream_url": f"/api/v1/task/{task_id}/stream", } + @router.get("/task/{task_id}/status") async def get_task_status(task_id: str): """Get task status""" @@ -284,6 +298,7 @@ async def get_task_status(task_id: str): return status + @router.get("/task/{task_id}/stream") async def stream_task_output(task_id: str): """Stream task output via Server-Sent Events (SSE)""" @@ -295,21 +310,20 @@ async def stream_task_output(task_id: str): async def event_generator(): # First, send the initial status - yield { - "event": "status", - "data": json.dumps({"status": status["status"]}) - } + yield {"event": "status", "data": json.dumps({"status": status["status"]})} # If it's already completed/failed, we just return the raw output if any and close if status["status"] in ["completed", "failed", "cancelled"]: try: db = await get_db() - task_row = await db.fetchone("SELECT raw_output_path FROM tasks WHERE id = ?", (task_id,)) + task_row = await db.fetchone( + "SELECT raw_output_path FROM tasks WHERE id = ?", (task_id,) + ) if task_row and task_row["raw_output_path"]: with open(task_row["raw_output_path"], "r") as f: yield { "event": "output", - "data": json.dumps({"chunk": f.read()}) + "data": json.dumps({"chunk": f.read()}), } except Exception: pass @@ -325,14 +339,14 @@ async def event_generator(): if event["type"] == "status": yield { "event": "status", - "data": json.dumps({"status": event["data"]}) + "data": json.dumps({"status": event["data"]}), } if event["data"] in ["completed", "failed", "cancelled"]: break elif event["type"] == "output": yield { "event": "output", - "data": json.dumps({"chunk": event["data"]}) + "data": json.dumps({"chunk": event["data"]}), } except asyncio.CancelledError: pass @@ -341,13 +355,14 @@ async def event_generator(): return EventSourceResponse(event_generator()) + @router.get("/task/{task_id}/report/csv") async def download_csv_report(task_id: str): """Download task results as a CSV report.""" db = await get_db() task_row = await db.fetchone( "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", - (task_id,) + (task_id,), ) if not task_row: @@ -357,15 +372,25 @@ async def download_csv_report(task_id: str): raise HTTPException(status_code=400, detail="Task is not finished yet") try: - structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} - csv_data = reporting.generate_csv_report(dict(task_row), {"structured": structured_data}) + structured_data = ( + json.loads(task_row["structured_json"]) + if task_row["structured_json"] + else {} + ) + csv_data = reporting.generate_csv_report( + dict(task_row), {"structured": structured_data} + ) except Exception: return _report_generation_error_response(task_id, "csv") await db.log_audit( "report_downloaded", f"CSV report downloaded for task {task_id}", - context={"format": "csv", "task_id": task_id, "plugin_id": task_row["plugin_id"]}, + context={ + "format": "csv", + "task_id": task_id, + "plugin_id": task_row["plugin_id"], + }, task_id=task_id, plugin_id=task_row["plugin_id"], ) @@ -373,16 +398,19 @@ async def download_csv_report(task_id: str): return Response( content=csv_data, media_type="text/csv", - headers={"Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "csv")}"'} + headers={ + "Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "csv")}"' + }, ) + @router.get("/task/{task_id}/report/html") async def download_html_report(task_id: str): """Download task results as an HTML report.""" db = await get_db() task_row = await db.fetchone( "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", - (task_id,) + (task_id,), ) if not task_row: @@ -392,15 +420,25 @@ async def download_html_report(task_id: str): raise HTTPException(status_code=400, detail="Task is not finished yet") try: - structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} - html_content = reporting.generate_html_report(dict(task_row), {"structured": structured_data}) + structured_data = ( + json.loads(task_row["structured_json"]) + if task_row["structured_json"] + else {} + ) + html_content = reporting.generate_html_report( + dict(task_row), {"structured": structured_data} + ) except Exception: return _report_generation_error_response(task_id, "html") await db.log_audit( "report_downloaded", f"HTML report downloaded for task {task_id}", - context={"format": "html", "task_id": task_id, "plugin_id": task_row["plugin_id"]}, + context={ + "format": "html", + "task_id": task_id, + "plugin_id": task_row["plugin_id"], + }, task_id=task_id, plugin_id=task_row["plugin_id"], ) @@ -408,16 +446,19 @@ async def download_html_report(task_id: str): return Response( content=html_content, media_type="text/html", - headers={"Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "html")}"'} + headers={ + "Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "html")}"' + }, ) + @router.get("/task/{task_id}/report/pdf") async def download_pdf_report(task_id: str): """Download task results as a PDF report.""" db = await get_db() task_row = await db.fetchone( "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", - (task_id,) + (task_id,), ) if not task_row: @@ -427,15 +468,27 @@ async def download_pdf_report(task_id: str): raise HTTPException(status_code=400, detail="Task is not finished yet") try: - structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} - pdf_bytes = bytes(reporting.generate_pdf_report(dict(task_row), {"structured": structured_data})) + structured_data = ( + json.loads(task_row["structured_json"]) + if task_row["structured_json"] + else {} + ) + pdf_bytes = bytes( + reporting.generate_pdf_report( + dict(task_row), {"structured": structured_data} + ) + ) except Exception: return _report_generation_error_response(task_id, "pdf") await db.log_audit( "report_downloaded", f"PDF report downloaded for task {task_id}", - context={"format": "pdf", "task_id": task_id, "plugin_id": task_row["plugin_id"]}, + context={ + "format": "pdf", + "task_id": task_id, + "plugin_id": task_row["plugin_id"], + }, task_id=task_id, plugin_id=task_row["plugin_id"], ) @@ -443,7 +496,9 @@ async def download_pdf_report(task_id: str): return Response( content=pdf_bytes, media_type="application/pdf", - headers={"Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "pdf")}"'} + headers={ + "Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "pdf")}"' + }, ) @@ -453,7 +508,7 @@ async def download_sarif_report(task_id: str): db = await get_db() task_row = await db.fetchone( "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", - (task_id,) + (task_id,), ) if not task_row: @@ -463,15 +518,25 @@ async def download_sarif_report(task_id: str): raise HTTPException(status_code=400, detail="Task is not finished yet") try: - structured_data = json.loads(task_row["structured_json"]) if task_row["structured_json"] else {} - sarif_data = reporting.generate_sarif_report(dict(task_row), {"structured": structured_data}) + structured_data = ( + json.loads(task_row["structured_json"]) + if task_row["structured_json"] + else {} + ) + sarif_data = reporting.generate_sarif_report( + dict(task_row), {"structured": structured_data} + ) except Exception: return _report_generation_error_response(task_id, "sarif") await db.log_audit( "report_downloaded", f"SARIF report downloaded for task {task_id}", - context={"format": "sarif", "task_id": task_id, "plugin_id": task_row["plugin_id"]}, + context={ + "format": "sarif", + "task_id": task_id, + "plugin_id": task_row["plugin_id"], + }, task_id=task_id, plugin_id=task_row["plugin_id"], ) @@ -479,7 +544,9 @@ async def download_sarif_report(task_id: str): return Response( content=sarif_data, media_type="application/sarif+json", - headers={"Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "sarif")}"'} + headers={ + "Content-Disposition": f'attachment; filename="{build_report_filename(dict(task_row), "sarif")}"' + }, ) @@ -495,7 +562,7 @@ async def get_task_result(task_id: str): raw_output_path, command_used, error_message, exit_code FROM tasks WHERE id = ? """, - (task_id,) + (task_id,), ) if not task_row: @@ -514,32 +581,51 @@ async def get_task_result(task_id: str): severity = str(finding.get("severity", "info")).lower() severity_counts[severity] = severity_counts.get(severity, 0) + 1 - structured_summary = structured.get("summary") if isinstance(structured, dict) else None - summary: List[str] = [ - str(item) for item in structured_summary - if isinstance(item, (str, int, float)) and str(item).strip() - ] if isinstance(structured_summary, list) else [] + structured_summary = ( + structured.get("summary") if isinstance(structured, dict) else None + ) + summary: List[str] = ( + [ + str(item) + for item in structured_summary + if isinstance(item, (str, int, float)) and str(item).strip() + ] + if isinstance(structured_summary, list) + else [] + ) total_findings = len(findings) if not summary and total_findings > 0: - critical_high = severity_counts.get("critical", 0) + severity_counts.get("high", 0) + critical_high = severity_counts.get("critical", 0) + severity_counts.get( + "high", 0 + ) if critical_high > 0: - summary.append(f"Assessment identified {total_findings} security risks, including {critical_high} high-priority items requiring remediation.") + summary.append( + f"Assessment identified {total_findings} security risks, including {critical_high} high-priority items requiring remediation." + ) else: - summary.append(f"Assessment identified {total_findings} minor observations; no critical or high-severity threats were found.") + summary.append( + f"Assessment identified {total_findings} minor observations; no critical or high-severity threats were found." + ) elif not summary: - summary.append("Security analysis revealed no significant vulnerabilities or exposed risks.") + summary.append( + "Security analysis revealed no significant vulnerabilities or exposed risks." + ) if ports := structured.get("open_ports"): - summary.append(f"Perimeter analysis confirmed {len(ports)} active network entry points.") + summary.append( + f"Perimeter analysis confirmed {len(ports)} active network entry points." + ) if techs := structured.get("technologies"): - summary.append(f"Fingerprinting identified {len(techs)} unique technologies powering the target infrastructure.") + summary.append( + f"Fingerprinting identified {len(techs)} unique technologies powering the target infrastructure." + ) # Read raw output (limit to 100k for performance, but usually enough) raw_output = None if task_row["raw_output_path"]: try: - with open(task_row["raw_output_path"], 'r') as f: + with open(task_row["raw_output_path"], "r") as f: raw_output = f.read(100000) except Exception: pass @@ -562,10 +648,12 @@ async def get_task_result(task_id: str): "raw_output_excerpt": raw_output, "raw_output": raw_output, "command_used": task_row["command_used"], - "errors": [{"message": task_row["error_message"]}] if task_row["error_message"] else [], + "errors": [{"message": task_row["error_message"]}] + if task_row["error_message"] + else [], "error_message": task_row["error_message"], "exit_code": task_row["exit_code"], - "metadata": {} + "metadata": {}, } @@ -577,11 +665,7 @@ async def cancel_task(task_id: str): if not cancelled: raise HTTPException(status_code=404, detail="Task not found or not running") - return { - "task_id": task_id, - "status": "cancelled", - "cancelled_at": "now" - } + return {"task_id": task_id, "status": "cancelled", "cancelled_at": "now"} @router.get("/dashboard/summary") @@ -592,7 +676,9 @@ async def build(): db = await get_db() # Get data - raw_findings = await db.fetchall("SELECT * FROM findings ORDER BY discovered_at DESC") + raw_findings = await db.fetchall( + "SELECT * FROM findings ORDER BY discovered_at DESC" + ) findings = parse_json_fields(raw_findings, ["metadata_json"]) task_stats = await db.fetchone( @@ -605,16 +691,21 @@ async def build(): """ ) - critical_findings: int = sum(bool(item.get("severity") == "critical") - for item in findings) - high_findings: int = sum(bool(item.get("severity") == "high") - for item in findings) - medium_findings: int = sum(bool(item.get("severity") == "medium") - for item in findings) - low_findings: int = sum(bool(item.get("severity") == "low") - for item in findings) - info_findings: int = sum(bool(item.get("severity") == "info") - for item in findings) + critical_findings: int = sum( + bool(item.get("severity") == "critical") for item in findings + ) + high_findings: int = sum( + bool(item.get("severity") == "high") for item in findings + ) + medium_findings: int = sum( + bool(item.get("severity") == "medium") for item in findings + ) + low_findings: int = sum( + bool(item.get("severity") == "low") for item in findings + ) + info_findings: int = sum( + bool(item.get("severity") == "info") for item in findings + ) recent_findings: List[Dict] = findings[:5] @@ -628,22 +719,28 @@ async def build(): "last_scan_time": findings[0].get("discovered_at") if findings else None, "recent_findings": recent_findings, "scan_activity": { - "total": int(task_stats["total"]) if task_stats and task_stats.get("total") is not None else 0, - "completed": int(task_stats["completed"]) if task_stats and task_stats.get("completed") is not None else 0, - "running": int(task_stats["running"]) if task_stats and task_stats.get("running") is not None else 0, + "total": int(task_stats["total"]) + if task_stats and task_stats.get("total") is not None + else 0, + "completed": int(task_stats["completed"]) + if task_stats and task_stats.get("completed") is not None + else 0, + "running": int(task_stats["running"]) + if task_stats and task_stats.get("running") is not None + else 0, }, "running_tasks": parse_json_fields( await db.fetchall( "SELECT id, plugin_id, tool_name, target, status, created_at FROM tasks WHERE status = 'running' ORDER BY created_at DESC LIMIT 5" ), - [] + [], ), "recent_tasks": parse_json_fields( await db.fetchall( "SELECT id, plugin_id, tool_name, target, status, created_at, duration_seconds FROM tasks ORDER BY created_at DESC LIMIT 5" ), - [] - ) + [], + ), } return await get_or_set_cached("summary:dashboard", build) @@ -678,7 +775,7 @@ async def list_tasks( page: int = 1, per_page: int = 25, plugin_id: Optional[str] = None, - status: Optional[str] = None + status: Optional[str] = None, ): """List all tasks with pagination""" db = await get_db() @@ -708,11 +805,19 @@ async def list_tasks( if where_clauses: count_query += " WHERE " + " AND ".join(where_clauses) - count_result = await db.fetchone(count_query, tuple(params[:-2]) if where_clauses else ()) - total: int = int(count_result["total"]) if count_result and count_result.get("total") is not None else 0 + count_result = await db.fetchone( + count_query, tuple(params[:-2]) if where_clauses else () + ) + total: int = ( + int(count_result["total"]) + if count_result and count_result.get("total") is not None + else 0 + ) # Parse JSON fields and format for frontend - tasks_list = parse_json_fields(tasks, ["structured_json", "config_json", "metadata_json", "inputs_json"]) + tasks_list = parse_json_fields( + tasks, ["structured_json", "config_json", "metadata_json", "inputs_json"] + ) for t in tasks_list: if "id" in t: t["task_id"] = t.pop("id") @@ -737,6 +842,7 @@ def build_page_url(page_num): params_list.append(f"status={status}") # Join with & and return return f"/api/v1/tasks?{'&'.join(params_list)}" + return { "tasks": tasks_list, "pagination": { @@ -744,9 +850,9 @@ def build_page_url(page_num): "per_page": per_page, "total_pages": total_pages, "total_items": total, - "next": build_page_url(next_page), # ← NEW - "previous": build_page_url(prev_page) # ← NEW - } + "next": build_page_url(next_page), # ← NEW + "previous": build_page_url(prev_page), # ← NEW + }, } @@ -756,12 +862,21 @@ async def delete_task_records(task_ids: List[str]): # Get raw output paths for file cleanup placeholders = ",".join(["?"] * len(task_ids)) - task_rows = await db.fetchall(f"SELECT raw_output_path FROM tasks WHERE id IN ({placeholders})", tuple(task_ids)) + task_rows = await db.fetchall( + f"SELECT raw_output_path FROM tasks WHERE id IN ({placeholders})", + tuple(task_ids), + ) # Delete associated data - await db.execute(f"DELETE FROM findings WHERE task_id IN ({placeholders})", tuple(task_ids)) - await db.execute(f"DELETE FROM reports WHERE task_id IN ({placeholders})", tuple(task_ids)) - await db.execute(f"DELETE FROM audit_log WHERE task_id IN ({placeholders})", tuple(task_ids)) + await db.execute( + f"DELETE FROM findings WHERE task_id IN ({placeholders})", tuple(task_ids) + ) + await db.execute( + f"DELETE FROM reports WHERE task_id IN ({placeholders})", tuple(task_ids) + ) + await db.execute( + f"DELETE FROM audit_log WHERE task_id IN ({placeholders})", tuple(task_ids) + ) await db.execute(f"DELETE FROM tasks WHERE id IN ({placeholders})", tuple(task_ids)) # Cleanup files on disk @@ -772,7 +887,10 @@ async def delete_task_records(task_ids: List[str]): if path.exists(): path.unlink() except Exception as e: - logger.error(f"Failed to delete raw output file {row['raw_output_path']}: {e}") + logger.error( + f"Failed to delete raw output file {row['raw_output_path']}: {e}" + ) + @router.delete("/task/{task_id}") async def delete_task(task_id: str): @@ -782,15 +900,14 @@ async def delete_task(task_id: str): # Check if task is running status = await executor.get_task_status(task_id) if status and status.get("status") == "running": - raise HTTPException(status_code=400, detail="Cannot delete a running task. Abort it first.") + raise HTTPException( + status_code=400, detail="Cannot delete a running task. Abort it first." + ) await delete_task_records([task_id]) await invalidate_view_cache() - return { - "task_id": task_id, - "deleted": True - } + return {"task_id": task_id, "deleted": True} @router.delete("/tasks/bulk") @@ -800,17 +917,19 @@ async def bulk_delete_tasks(task_ids: List[str]): # Check if any tasks are running placeholders = ",".join(["?"] * len(task_ids)) - running_tasks = await db.fetchone(f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND status = 'running' LIMIT 1", tuple(task_ids)) + running_tasks = await db.fetchone( + f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND status = 'running' LIMIT 1", + tuple(task_ids), + ) if running_tasks: - raise HTTPException(status_code=400, detail="Cannot delete running tasks. Abort them first.") + raise HTTPException( + status_code=400, detail="Cannot delete running tasks. Abort them first." + ) await delete_task_records(task_ids) await invalidate_view_cache() - return { - "deleted_count": len(task_ids), - "success": True - } + return {"deleted_count": len(task_ids), "success": True} @router.delete("/tasks/clear") @@ -819,9 +938,13 @@ async def clear_all_tasks(): db = await get_db() # Prevent clearing if any tasks are running - running_tasks = await db.fetchone("SELECT id FROM tasks WHERE status = 'running' LIMIT 1") + running_tasks = await db.fetchone( + "SELECT id FROM tasks WHERE status = 'running' LIMIT 1" + ) if running_tasks: - raise HTTPException(status_code=400, detail="Cannot clear history while tasks are running.") + raise HTTPException( + status_code=400, detail="Cannot clear history while tasks are running." + ) # Get all task IDs to cleanup files all_tasks = await db.fetchall("SELECT id FROM tasks") @@ -849,7 +972,7 @@ async def clear_all_tasks(): return { "cleared": True, - "message": "All scan history and associated data has been purged." + "message": "All scan history and associated data has been purged.", } @@ -860,21 +983,21 @@ async def get_settings(): "network": { "bind_address": settings.bind_address, "port": settings.bind_port, - "allow_remote": False + "allow_remote": False, }, "sandbox": { "engine": "docker" if settings.docker_enabled else "subprocess", "default_timeout": settings.sandbox_timeout, "resource_limits": { "cpu_quota": settings.sandbox_cpu_quota, - "memory_mb": settings.sandbox_memory_mb - } + "memory_mb": settings.sandbox_memory_mb, + }, }, "safety": { "require_consent": settings.require_consent, "safe_mode_default": settings.safe_mode_default, - "allowed_networks": settings.allowed_networks - } + "allowed_networks": settings.allowed_networks, + }, } @@ -898,7 +1021,9 @@ async def upsert_vault_secret(name: str, payload: Dict[str, str]): encrypted = crypto.encrypt(value) secret_id = str(uuid.uuid4()) - existing = await db.fetchone("SELECT id FROM credential_vault WHERE name = ?", (name,)) + existing = await db.fetchone( + "SELECT id FROM credential_vault WHERE name = ?", (name,) + ) if existing: await db.execute( "UPDATE credential_vault SET encrypted_value = ?, updated_at = datetime('now') WHERE name = ?", @@ -915,7 +1040,9 @@ async def upsert_vault_secret(name: str, payload: Dict[str, str]): @router.get("/vault/{name}") async def get_vault_secret(name: str): db = await get_db() - row = await db.fetchone("SELECT encrypted_value FROM credential_vault WHERE name = ?", (name,)) + row = await db.fetchone( + "SELECT encrypted_value FROM credential_vault WHERE name = ?", (name,) + ) if not row: raise HTTPException(status_code=404, detail="Secret not found") crypto = VaultCrypto(settings.resolved_vault_key) @@ -944,7 +1071,9 @@ async def create_workflow(payload: Dict[str, Any]): steps = payload.get("steps", []) if not isinstance(steps, list) or not steps: - raise HTTPException(status_code=400, detail="Workflow requires at least one step") + raise HTTPException( + status_code=400, detail="Workflow requires at least one step" + ) workflow_id = str(uuid.uuid4()) schedule_seconds = payload.get("schedule_seconds") @@ -969,7 +1098,9 @@ async def create_workflow(payload: Dict[str, Any]): @router.post("/workflows/{workflow_id}/run") async def run_workflow_once(workflow_id: str): db = await get_db() - row = await db.fetchone("SELECT steps_json FROM workflows WHERE id = ?", (workflow_id,)) + row = await db.fetchone( + "SELECT steps_json FROM workflows WHERE id = ?", (workflow_id,) + ) if not row: raise HTTPException(status_code=404, detail="Workflow not found") steps = json.loads(row["steps_json"] or "[]") @@ -983,7 +1114,10 @@ async def run_workflow_once(workflow_id: str): ) asyncio.create_task(executor.execute_task(task_id)) created_task_ids.append(task_id) - await db.execute("UPDATE workflows SET last_run_at = datetime('now') WHERE id = ?", (workflow_id,)) + await db.execute( + "UPDATE workflows SET last_run_at = datetime('now') WHERE id = ?", + (workflow_id,), + ) return {"workflow_id": workflow_id, "queued_tasks": created_task_ids} @@ -1014,7 +1148,9 @@ async def update_workflow(workflow_id: str, payload: Dict[str, Any]): raise HTTPException(status_code=400, detail="No update fields provided") params.append(workflow_id) - await db.execute(f"UPDATE workflows SET {', '.join(updates)} WHERE id = ?", tuple(params)) + await db.execute( + f"UPDATE workflows SET {', '.join(updates)} WHERE id = ?", tuple(params) + ) return {"workflow_id": workflow_id, "updated": True} @@ -1043,7 +1179,7 @@ async def get_finding_details(finding_id: str): JOIN tasks t ON f.task_id = t.id WHERE f.id = ? """, - (finding_id,) + (finding_id,), ) if not finding_row: @@ -1071,7 +1207,7 @@ async def get_finding_details(finding_id: str): "cvss": finding_row["cvss"], "cve": finding_row["cve"], "discovered_at": finding_row["discovered_at"], - "metadata": metadata + "metadata": metadata, } @@ -1081,8 +1217,12 @@ async def get_attack_surface(): db = await get_db() # We aggregate unique targets from tasks and findings - tasks = await db.fetchall("SELECT DISTINCT target, tool_name, created_at FROM tasks ORDER BY created_at DESC") - findings = await db.fetchall("SELECT DISTINCT target, category, severity, discovered_at FROM findings ORDER BY discovered_at DESC") + tasks = await db.fetchall( + "SELECT DISTINCT target, tool_name, created_at FROM tasks ORDER BY created_at DESC" + ) + findings = await db.fetchall( + "SELECT DISTINCT target, category, severity, discovered_at FROM findings ORDER BY discovered_at DESC" + ) entries = [] seen_targets = set() @@ -1091,30 +1231,34 @@ async def get_attack_surface(): for f in findings: target = f["target"] if target not in seen_targets: - entries.append({ - "id": str(uuid.uuid4()), - "category": f["category"], - "item": target, - "details": f"Active exposure identified in {f['category']}", - "risk": f["severity"], - "source": "Audit Scan", - "last_seen": f["discovered_at"] - }) + entries.append( + { + "id": str(uuid.uuid4()), + "category": f["category"], + "item": target, + "details": f"Active exposure identified in {f['category']}", + "risk": f["severity"], + "source": "Audit Scan", + "last_seen": f["discovered_at"], + } + ) seen_targets.add(target) # Add other scanned targets for t in tasks: target = t["target"] if target not in seen_targets: - entries.append({ - "id": str(uuid.uuid4()), - "category": "Infrastructure", - "item": target, - "details": f"Monitored via {t['tool_name']}", - "risk": "info", - "source": "Recon", - "last_seen": t["created_at"] - }) + entries.append( + { + "id": str(uuid.uuid4()), + "category": "Infrastructure", + "item": target, + "details": f"Monitored via {t['tool_name']}", + "risk": "info", + "source": "Recon", + "last_seen": t["created_at"], + } + ) seen_targets.add(target) return {"entries": entries} @@ -1125,6 +1269,8 @@ async def get_assets(): """Return a list of tracked assets.""" db = await get_db() # For now, we use unique targets as assets - rows = await db.fetchall("SELECT DISTINCT target FROM tasks UNION SELECT DISTINCT target FROM findings") + rows = await db.fetchall( + "SELECT DISTINCT target FROM tasks UNION SELECT DISTINCT target FROM findings" + ) assets = [{"id": str(uuid.uuid4()), "name": row["target"]} for row in rows] - return {"assets": assets} \ No newline at end of file + return {"assets": assets} diff --git a/backend/secuscan/scanners/base.py b/backend/secuscan/scanners/base.py index 121fab22..0cf896ee 100644 --- a/backend/secuscan/scanners/base.py +++ b/backend/secuscan/scanners/base.py @@ -5,6 +5,7 @@ logger = logging.getLogger(__name__) + class BaseScanner(ABC): """ Abstract base class for modular security scanners. @@ -33,7 +34,7 @@ def category(self) -> str: async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: """ Execute the scanning logic. - + Returns: Dictionary containing findings, summary, and other structured data. """ @@ -58,6 +59,6 @@ def normalize_severity(self, severity: str) -> str: "low": "low", "info": "info", "informational": "info", - "note": "info" + "note": "info", } return mapping.get(s, "info") diff --git a/backend/secuscan/scanners/port_scanner.py b/backend/secuscan/scanners/port_scanner.py index b70bbe65..753c094e 100644 --- a/backend/secuscan/scanners/port_scanner.py +++ b/backend/secuscan/scanners/port_scanner.py @@ -7,6 +7,7 @@ from ..config import settings from datetime import datetime + class PortScanner(BaseScanner): """ Orchestrates Nmap scanning with refined result parsing. @@ -26,7 +27,7 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: Runs Nmap scan and parses output into structured findings. """ self.update_progress(0.1) - + # Prepare inputs for the Nmap plugin # Map PortScanner inputs to Nmap plugin fields plugin_inputs = { @@ -34,9 +35,9 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: "scan_type": inputs.get("scan_type", "-sV"), "ports": inputs.get("ports", "top100"), "speed": inputs.get("speed", "T4"), - "safe_mode": inputs.get("safe_mode", True) + "safe_mode": inputs.get("safe_mode", True), } - + # Handle port shortcuts if plugin_inputs["ports"] == "top100": plugin_inputs["ports"] = "--top-ports 100" @@ -47,7 +48,7 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: plugin_manager = get_plugin_manager() command = plugin_manager.build_command("nmap", plugin_inputs) - + if not command: raise ValueError("Failed to build nmap command") @@ -55,29 +56,31 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: self.update_progress(0.2) output, exit_code = await self._execute_command(command) self.update_progress(0.8) - + # Parse findings = self._parse_nmap_output(output, target) - + self.update_progress(1.0) return { "findings": findings, - "summary": [f"Scanned {target} for open ports.", f"Discovered {len(findings)} open ports."], + "summary": [ + f"Scanned {target} for open ports.", + f"Discovered {len(findings)} open ports.", + ], "open_ports": [f["metadata"]["port"] for f in findings], - "status": "completed" if exit_code == 0 else "failed" + "status": "completed" if exit_code == 0 else "failed", } async def _execute_command(self, command: List[str]) -> tuple: """Executes the command and returns (output, exit_code)""" import asyncio.subprocess + process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT + *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) try: stdout, _ = await process.communicate() - return stdout.decode('utf-8', errors='replace'), process.returncode + return stdout.decode("utf-8", errors="replace"), process.returncode except asyncio.CancelledError: try: process.kill() @@ -90,28 +93,30 @@ def _parse_nmap_output(self, output: str, target: str) -> List[Dict[str, Any]]: findings = [] # Regex for open ports: 80/tcp open http port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)\s*(.*)") - + for match in port_pattern.finditer(output): port_str, proto, service, version = match.groups() - + title = f"Open Port: {port_str}/{proto} ({service})" description = f"Port {port_str} is open and running {service} service." if version.strip(): description += f" Version detected: {version.strip()}" - - findings.append({ - "title": title, - "category": "Network Service", - "severity": self.normalize_severity("low"), - "target": target, - "description": description, - "remediation": "Close unnecessary ports and use a firewall to restrict access.", - "metadata": { - "port": port_str, - "protocol": proto, - "service": service, - "version": version.strip() + + findings.append( + { + "title": title, + "category": "Network Service", + "severity": self.normalize_severity("low"), + "target": target, + "description": description, + "remediation": "Close unnecessary ports and use a firewall to restrict access.", + "metadata": { + "port": port_str, + "protocol": proto, + "service": service, + "version": version.strip(), + }, } - }) - + ) + return findings diff --git a/backend/secuscan/scanners/recon_scanner.py b/backend/secuscan/scanners/recon_scanner.py index 1fb16c0c..91fe6d4d 100644 --- a/backend/secuscan/scanners/recon_scanner.py +++ b/backend/secuscan/scanners/recon_scanner.py @@ -10,6 +10,7 @@ logger = logging.getLogger(__name__) + class ReconScanner(BaseScanner): """ Orchestrates multiple reconnaissance tools (Subfinder, WHOIS, DNS). @@ -31,7 +32,7 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: findings = [] summary = [] rows = [] - + # 1. Subdomain Discovery (if applicable) if "." in target and not target.replace(".", "").isdigit(): self.update_progress(0.1) @@ -41,11 +42,13 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: if sub_findings: for f in sub_findings: if f.get("metadata"): - rows.append({ - "tool": "SUBDOMAIN", - "subdomain": f["metadata"].get("subdomain"), - "details": f.get("description") - }) + rows.append( + { + "tool": "SUBDOMAIN", + "subdomain": f["metadata"].get("subdomain"), + "details": f.get("description"), + } + ) summary.append(f"Discovered {len(sub_findings)} subdomains.") except Exception as e: logger.error(f"Subdomain discovery failed: {e}") @@ -61,12 +64,18 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: for f in whois_findings: if f.get("metadata"): meta = f["metadata"] - rows.append({ - "tool": "WHOIS", - "registrar": meta.get("registrar") or meta.get("registrar_name", "N/A"), - "organization": meta.get("org") or meta.get("organization", "N/A"), - "expiry": str(meta.get("expiration_date", "N/A")).split(' ')[0] - }) + rows.append( + { + "tool": "WHOIS", + "registrar": meta.get("registrar") + or meta.get("registrar_name", "N/A"), + "organization": meta.get("org") + or meta.get("organization", "N/A"), + "expiry": str(meta.get("expiration_date", "N/A")).split( + " " + )[0], + } + ) summary.append("Retrieved WHOIS registration records.") self.update_progress(0.7) except Exception as e: @@ -80,12 +89,14 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: for f in dns_findings: if f.get("metadata"): meta = f["metadata"] - rows.append({ - "tool": "DNS", - "record": meta.get("record_type", "N/A"), - "value": meta.get("value", "N/A"), - "details": f.get("description", "N/A") - }) + rows.append( + { + "tool": "DNS", + "record": meta.get("record_type", "N/A"), + "value": meta.get("value", "N/A"), + "details": f.get("description", "N/A"), + } + ) summary.append(f"Discovered {len(dns_findings)} DNS records.") self.update_progress(1.0) except Exception as e: @@ -95,101 +106,114 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: "findings": findings, "rows": rows, "summary": summary, - "status": "completed" + "status": "completed", } async def _run_subfinder(self, target: str) -> List[Dict[str, Any]]: pm = get_plugin_manager() cmd = pm.build_command("subdomain_discovery", {"target": target}) - if not cmd: return [] - + if not cmd: + return [] + output, _ = await self._execute_command(cmd) findings = [] for line in output.splitlines(): if line.strip() and "." in line: - findings.append({ - "title": f"Subdomain Discovered: {line.strip()}", - "category": "Asset Discovery", - "severity": "info", - "target": target, - "description": f"Found subdomain for {target}: {line.strip()}", - "metadata": {"subdomain": line.strip()} - }) + findings.append( + { + "title": f"Subdomain Discovered: {line.strip()}", + "category": "Asset Discovery", + "severity": "info", + "target": target, + "description": f"Found subdomain for {target}: {line.strip()}", + "metadata": {"subdomain": line.strip()}, + } + ) return findings async def _run_whois(self, target: str) -> List[Dict[str, Any]]: pm = get_plugin_manager() cmd = pm.build_command("whois_lookup", {"target": target}) - if not cmd: return [] - + if not cmd: + return [] + output, _ = await self._execute_command(cmd) - + try: data = json.loads(output) registrar = data.get("registrar") or data.get("registrar_name", "Unknown") expiry = data.get("expiration_date") if isinstance(expiry, list): expiry = expiry[0] - - return [{ - "title": "WHOIS Registration Data", - "category": "Domain Intelligence", - "severity": "info", - "target": target, - "description": f"Registrar: {registrar}\nExpiry: {expiry if expiry else 'Unknown'}", - "metadata": data - }] + + return [ + { + "title": "WHOIS Registration Data", + "category": "Domain Intelligence", + "severity": "info", + "target": target, + "description": f"Registrar: {registrar}\nExpiry: {expiry if expiry else 'Unknown'}", + "metadata": data, + } + ] except Exception: # Fallback to regex if JSON parsing fails (e.g. legacy output) registrar = re.search(r"Registrar:\s*(.*)", output, re.IGNORECASE) expiry = re.search(r"Registry Expiry Date:\s*(.*)", output, re.IGNORECASE) - - return [{ - "title": "WHOIS Registration Data", - "category": "Domain Intelligence", - "severity": "info", - "target": target, - "description": f"Registrar: {registrar.group(1).strip() if registrar else 'Unknown'}\n" - f"Expiry: {expiry.group(1).strip() if expiry else 'Unknown'}", - "metadata": {"raw_whois": output[:1000]} - }] + + return [ + { + "title": "WHOIS Registration Data", + "category": "Domain Intelligence", + "severity": "info", + "target": target, + "description": f"Registrar: {registrar.group(1).strip() if registrar else 'Unknown'}\n" + f"Expiry: {expiry.group(1).strip() if expiry else 'Unknown'}", + "metadata": {"raw_whois": output[:1000]}, + } + ] async def _run_dns_enum(self, target: str) -> List[Dict[str, Any]]: pm = get_plugin_manager() cmd = pm.build_command("dns_enum", {"target": target}) - if not cmd: return [] - + if not cmd: + return [] + output, _ = await self._execute_command(cmd) findings = [] # Look for A, MX, NS records patterns = { "A Record": r"(?i)A\s+([\d\.]+)", "MX Record": r"(?i)MX\s+(.*)", - "NS Record": r"(?i)NS\s+(.*)" + "NS Record": r"(?i)NS\s+(.*)", } - + for name, pattern in patterns.items(): for match in re.finditer(pattern, output): - findings.append({ - "title": f"DNS {name}: {match.group(1).strip()}", - "category": "DNS Configuration", - "severity": "info", - "target": target, - "description": f"Discovered {name} pointing to {match.group(1).strip()}", - "metadata": {"record_type": name, "value": match.group(1).strip()} - }) + findings.append( + { + "title": f"DNS {name}: {match.group(1).strip()}", + "category": "DNS Configuration", + "severity": "info", + "target": target, + "description": f"Discovered {name} pointing to {match.group(1).strip()}", + "metadata": { + "record_type": name, + "value": match.group(1).strip(), + }, + } + ) return findings async def _execute_command(self, command: List[str]) -> tuple: import asyncio.subprocess + process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT + *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) try: stdout, _ = await process.communicate() - return stdout.decode('utf-8', errors='replace'), process.returncode + return stdout.decode("utf-8", errors="replace"), process.returncode except asyncio.CancelledError: try: process.kill() diff --git a/backend/secuscan/scanners/web_scanner.py b/backend/secuscan/scanners/web_scanner.py index 51335f0a..739c66f0 100644 --- a/backend/secuscan/scanners/web_scanner.py +++ b/backend/secuscan/scanners/web_scanner.py @@ -7,6 +7,7 @@ from ..config import settings from datetime import datetime + class WebScanner(BaseScanner): """ Orchestrates DAST scanning (Nikto, Nuclei, FFUF). @@ -28,7 +29,7 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: intensity = inputs.get("scan_intensity", "light") findings = [] summary = [f"Performing {intensity} web scan on {target}"] - + # 1. HTTP Inspection (Technology Fingerprinting) self.update_progress(0.1) tech_findings = await self._run_http_inspector(target) @@ -40,7 +41,9 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: self.update_progress(0.3) nuclei_findings = await self._run_nuclei(target) findings.extend(nuclei_findings) - summary.append(f"Discovered {len(nuclei_findings)} vulnerabilities via template scanning.") + summary.append( + f"Discovered {len(nuclei_findings)} vulnerabilities via template scanning." + ) self.update_progress(0.5) # 3. Nikto (Comprehensive web server scan) - Deep only @@ -60,102 +63,112 @@ async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: self.update_progress(1.0) self.update_progress(1.0) - return { - "findings": findings, - "summary": summary, - "status": "completed" - } + return {"findings": findings, "summary": summary, "status": "completed"} async def _run_http_inspector(self, target: str) -> List[Dict[str, Any]]: pm = get_plugin_manager() cmd = pm.build_command("http_inspector", {"target": target}) - if not cmd: return [] + if not cmd: + return [] output, _ = await self._execute_command(cmd) - + findings = [] if match := re.search(r"(?i)Server:\s*(.*)", output): - findings.append({ - "title": f"Web Server Disclosed: {match.group(1).strip()}", - "category": "Information Disclosure", - "severity": "info", - "target": target, - "description": f"The web server discloses its version: {match.group(1).strip()}", - "metadata": {"server": match.group(1).strip()} - }) + findings.append( + { + "title": f"Web Server Disclosed: {match.group(1).strip()}", + "category": "Information Disclosure", + "severity": "info", + "target": target, + "description": f"The web server discloses its version: {match.group(1).strip()}", + "metadata": {"server": match.group(1).strip()}, + } + ) return findings async def _run_nuclei(self, target: str) -> List[Dict[str, Any]]: pm = get_plugin_manager() # Ensure we use JSON output for easier parsing if available cmd = pm.build_command("nuclei", {"target": target, "silent": True}) - if not cmd: return [] - + if not cmd: + return [] + output, _ = await self._execute_command(cmd) findings = [] # Nuclei result pattern: [template-id] [severity] [url] [message] for line in output.splitlines(): if match := re.match(r"\[(.*?)\] \[(.*?)\] \[(.*?)\] (.*)", line): tid, sev, url, msg = match.groups() - findings.append({ - "title": f"Nuclei: {msg}", - "category": "Vulnerability", - "severity": self.normalize_severity(sev), - "target": target, - "description": f"Template {tid} detected a {sev} issue on {url}.", - "metadata": {"template": tid, "url": url} - }) + findings.append( + { + "title": f"Nuclei: {msg}", + "category": "Vulnerability", + "severity": self.normalize_severity(sev), + "target": target, + "description": f"Template {tid} detected a {sev} issue on {url}.", + "metadata": {"template": tid, "url": url}, + } + ) return findings async def _run_nikto(self, target: str) -> List[Dict[str, Any]]: pm = get_plugin_manager() cmd = pm.build_command("nikto", {"target": target}) - if not cmd: return [] + if not cmd: + return [] output, _ = await self._execute_command(cmd) - + findings = [] for line in output.splitlines(): if "+ " in line: - findings.append({ - "title": "Nikto Observation", - "category": "Web Vulnerability", - "severity": "medium", # Nikto doesn't categorize well without -Format json - "target": target, - "description": line.replace("+ ", "").strip(), - "metadata": {"source": "nikto"} - }) + findings.append( + { + "title": "Nikto Observation", + "category": "Web Vulnerability", + "severity": "medium", # Nikto doesn't categorize well without -Format json + "target": target, + "description": line.replace("+ ", "").strip(), + "metadata": {"source": "nikto"}, + } + ) return findings async def _run_ffuf(self, target: str) -> List[Dict[str, Any]]: # FFUF is usually quiet or complex, we'll implement it as a finding of 'Interesting Paths' pm = get_plugin_manager() cmd = pm.build_command("dir_discovery", {"target": target}) - if not cmd: return [] + if not cmd: + return [] output, _ = await self._execute_command(cmd) - + findings = [] # Extract 200/301 results - for match in re.finditer(r"\[Status: (\d+), Size: \d+, Words: \d+, Lines: \d+, Duration: \d+ms\]\s*\|\s*URL: (.*)", output): + for match in re.finditer( + r"\[Status: (\d+), Size: \d+, Words: \d+, Lines: \d+, Duration: \d+ms\]\s*\|\s*URL: (.*)", + output, + ): status, url = match.groups() - findings.append({ - "title": f"Discovered Path: {url} (Status {status})", - "category": "Asset Discovery", - "severity": "info", - "target": target, - "description": f"Accessible path found during fuzzing: {url}", - "metadata": {"status": status} - }) + findings.append( + { + "title": f"Discovered Path: {url} (Status {status})", + "category": "Asset Discovery", + "severity": "info", + "target": target, + "description": f"Accessible path found during fuzzing: {url}", + "metadata": {"status": status}, + } + ) return findings async def _execute_command(self, command: List[str]) -> tuple: import asyncio.subprocess + process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT + *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) try: stdout, _ = await process.communicate() - return stdout.decode('utf-8', errors='replace'), process.returncode + return stdout.decode("utf-8", errors="replace"), process.returncode except asyncio.CancelledError: try: process.kill() diff --git a/backend/secuscan/validation.py b/backend/secuscan/validation.py index 495edfa3..d407eb36 100644 --- a/backend/secuscan/validation.py +++ b/backend/secuscan/validation.py @@ -12,9 +12,9 @@ # Blocked network ranges BLOCKED_NETWORKS = [ - ipaddress.ip_network("0.0.0.0/8"), # Broadcast + ipaddress.ip_network("0.0.0.0/8"), # Broadcast ipaddress.ip_network("169.254.0.0/16"), # Link-local - ipaddress.ip_network("224.0.0.0/4"), # Multicast + ipaddress.ip_network("224.0.0.0/4"), # Multicast ] # Allowed private IP ranges @@ -32,11 +32,11 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: """ Validate scan target address (IP, Hostname, URL, or CIDR). - + Args: target: IP address, hostname, or network range to validate safe_mode: Whether to enforce safe mode restrictions - + Returns: Tuple of (is_valid, error_message) """ @@ -47,7 +47,7 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: # Try parsing as IP network (handles single IP and CIDR) try: net = ipaddress.ip_network(target, strict=False) - + # Check blocked networks (Broadcast, Link-local, Multicast) if any(net.overlaps(blocked) for blocked in BLOCKED_NETWORKS): return False, "Target overlaps with blocked network range" @@ -59,11 +59,17 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: # Safe mode: only allow private IPs if safe_mode: is_private = any( - (net.version == allowed.version and (net.subnet_of(allowed) or net.overlaps(allowed))) + ( + net.version == allowed.version + and (net.subnet_of(allowed) or net.overlaps(allowed)) + ) for allowed in ALLOWED_PRIVATE ) if not is_private: - return False, "Public IPs/networks not allowed in safe mode (SecuScan Guardrail)" + return ( + False, + "Public IPs/networks not allowed in safe mode (SecuScan Guardrail)", + ) return True, "" @@ -83,7 +89,10 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: hostname_to_validate = host_part.split(":", 1)[0] # Validate hostname format (RFC 1123) - if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$', hostname_to_validate): + if not re.match( + r"^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$", + hostname_to_validate, + ): return False, "Invalid hostname format" # Check blocked TLDs in safe mode @@ -98,34 +107,34 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: def validate_port(port: int) -> Tuple[bool, str]: """ Validate port number. - + Args: port: Port number to validate - + Returns: Tuple of (is_valid, error_message) """ if port < 1 or port > 65535: return False, "Port must be between 1 and 65535" - + return True, "" def validate_port_range(port_range: str) -> Tuple[bool, str]: """ Validate port range specification. - + Args: port_range: Port range string (e.g., "80,443" or "1-1000") - + Returns: Tuple of (is_valid, error_message) """ # Handle comma-separated ports (supports mixed specs like "80,443-8080") - if ',' in port_range: - for port_str in port_range.split(','): + if "," in port_range: + for port_str in port_range.split(","): port_str = port_str.strip() - if '-' in port_str: + if "-" in port_str: # Delegate sub-ranges like "443-8080" to the range parser below is_valid, msg = validate_port_range(port_str) if not is_valid: @@ -141,9 +150,9 @@ def validate_port_range(port_range: str) -> Tuple[bool, str]: return True, "" # Handle port ranges - if '-' in port_range: + if "-" in port_range: try: - start, end = map(int, port_range.split('-')) + start, end = map(int, port_range.split("-")) if start > end: return False, "Port range start must be less than end" @@ -167,21 +176,22 @@ def validate_port_range(port_range: str) -> Tuple[bool, str]: def validate_url(url: str) -> Tuple[bool, str]: """ Validate URL format. - + Args: url: URL to validate - + Returns: Tuple of (is_valid, error_message) """ # Basic URL validation url_pattern = re.compile( - r'^https?://' # http:// or https:// - r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain - r'localhost|' # localhost - r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # IP - r'(?::\d+)?' # optional port - r'(?:/?|[/?]\S+)$', re.IGNORECASE + r"^https?://" # http:// or https:// + r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|" # domain + r"localhost|" # localhost + r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # IP + r"(?::\d+)?" # optional port + r"(?:/?|[/?]\S+)$", + re.IGNORECASE, ) return (True, "") if url_pattern.match(url) else (False, "Invalid URL format") @@ -190,33 +200,34 @@ def validate_url(url: str) -> Tuple[bool, str]: def sanitize_input(value: str) -> str: """ Sanitize user input to prevent command injection. - + Args: value: Input value to sanitize - + Returns: Sanitized value """ # Remove shell metacharacters - dangerous_chars = [';', '|', '&', '$', '`', '(', ')', '<', '>', '\n', '\r'] + dangerous_chars = [";", "|", "&", "$", "`", "(", ")", "<", ">", "\n", "\r"] for char in dangerous_chars: - value = value.replace(char, '') - + value = value.replace(char, "") + return value.strip() def is_safe_path(path: str, base_dir: str) -> bool: """ Check if a path is safe (no directory traversal). - + Args: path: Path to check base_dir: Base directory to restrict to - + Returns: True if path is safe """ import os + try: real_base = os.path.realpath(base_dir) real_path = os.path.realpath(os.path.join(base_dir, path)) @@ -228,11 +239,11 @@ def is_safe_path(path: str, base_dir: str) -> bool: def match_pattern(value: str, pattern: str) -> bool: """ Match value against wildcard pattern. - + Args: value: Value to match pattern: Pattern with wildcards (* and ?) - + Returns: True if value matches pattern """ @@ -243,7 +254,10 @@ def match_pattern(value: str, pattern: str) -> bool: # Task-start payload size/length validation # --------------------------------------------------------------------------- -def validate_task_start_payload(raw_body: bytes, inputs: Dict[str, Any]) -> Tuple[bool, int, str]: + +def validate_task_start_payload( + raw_body: bytes, inputs: Dict[str, Any] +) -> Tuple[bool, int, str]: """ Enforce size and field-length limits on POST /task/start payloads. @@ -306,7 +320,10 @@ def _check_field(key: str, value: Any) -> Tuple[bool, int, str]: f"(max {settings.task_start_max_array_length}).", ) for idx, item in enumerate(value): - if isinstance(item, str) and len(item) > settings.task_start_max_field_length: + if ( + isinstance(item, str) + and len(item) > settings.task_start_max_field_length + ): return ( False, 400, @@ -315,4 +332,4 @@ def _check_field(key: str, value: Any) -> Tuple[bool, int, str]: f"{settings.task_start_max_field_length} characters.", ) - return True, 0, "" \ No newline at end of file + return True, 0, "" diff --git a/backend/secuscan/workflows.py b/backend/secuscan/workflows.py index 52e7b638..32ed40b5 100644 --- a/backend/secuscan/workflows.py +++ b/backend/secuscan/workflows.py @@ -57,15 +57,21 @@ async def tick(self): now = datetime.now(timezone.utc) for row in rows: - if not self._should_run(now, row.get("last_run_at"), int(row["schedule_seconds"])): + if not self._should_run( + now, row.get("last_run_at"), int(row["schedule_seconds"]) + ): continue - await self._run_workflow(row["id"], json.loads(row.get("steps_json") or "[]")) + await self._run_workflow( + row["id"], json.loads(row.get("steps_json") or "[]") + ) await db.execute( "UPDATE workflows SET last_run_at = datetime('now') WHERE id = ?", (row["id"],), ) - def _should_run(self, now: datetime, last_run_at: str | None, schedule_seconds: int) -> bool: + def _should_run( + self, now: datetime, last_run_at: str | None, schedule_seconds: int + ) -> bool: if not last_run_at: return True last = datetime.fromisoformat(last_run_at.replace("Z", "+00:00")) @@ -79,7 +85,9 @@ async def _run_workflow(self, workflow_id: str, steps: List[Dict[str, Any]]): inputs = step.get("inputs") or {} if not plugin_id: continue - task_id = await executor.create_task(plugin_id, inputs, preset=step.get("preset"), consent_granted=True) + task_id = await executor.create_task( + plugin_id, inputs, preset=step.get("preset"), consent_granted=True + ) asyncio.create_task(executor.execute_task(task_id)) diff --git a/docs/API.md b/docs/API.md index 084f3367..0d45d43f 100644 --- a/docs/API.md +++ b/docs/API.md @@ -40,4 +40,4 @@ curl "http://localhost:8000/api/v1/tasks?page=2&per_page=10" # With filters curl "http://localhost:8000/api/v1/tasks?status=completed&plugin_id=nmap&page=1&per_page=20" -""" \ No newline at end of file +""" diff --git a/docs/PRODUCT_SPEC.md b/docs/PRODUCT_SPEC.md index 8b38d093..f8fa9c39 100644 --- a/docs/PRODUCT_SPEC.md +++ b/docs/PRODUCT_SPEC.md @@ -3,10 +3,10 @@ --- -**Document Version:** 1.0 -**Classification:** Internal Release -**Target Audience:** Engineering Team, Security Researchers, Pentesting Students -**Last Updated:** November 2, 2025 +**Document Version:** 1.0 +**Classification:** Internal Release +**Target Audience:** Engineering Team, Security Researchers, Pentesting Students +**Last Updated:** November 2, 2025 --- @@ -64,7 +64,7 @@ SecuScan is built on five foundational principles: ### 1.4 Product Purpose -**Mission Statement:** +**Mission Statement:** "Enable learning-driven, ethical penetration testing for academic and self-training use without exposing external systems or requiring a remote backend." SecuScan bridges the gap between theoretical security knowledge and practical application. Students can safely experiment with professional-grade tools in controlled environments, while experienced practitioners benefit from a unified, privacy-respecting toolkit that doesn't send scan data to third-party services. @@ -105,13 +105,13 @@ SecuScan bridges the gap between theoretical security knowledge and practical ap ### 1.6 Key Differentiators -**vs. Kali Linux:** +**vs. Kali Linux:** SecuScan provides a curated, guided experience rather than a comprehensive toolkit. It's designed for learning specific workflows, not replacing a full penetration testing OS. -**vs. Burp Suite:** +**vs. Burp Suite:** While Burp focuses on web application proxying and manual testing, SecuScan emphasizes automated scanning workflows with educational scaffolding. -**vs. Cloud Scanning Services (Qualys, Rapid7):** +**vs. Cloud Scanning Services (Qualys, Rapid7):** Complete data privacy—no scan results leave your machine. No subscription fees, no internet requirement, no compliance concerns. ### 1.7 Success Metrics @@ -147,9 +147,9 @@ The initial release includes five battle-tested tools, selected for their utilit #### 2.2.1 Nmap (Network Mapper) -**Tool ID:** `nmap` -**Binary:** `nmap` (+ `python-nmap` wrapper) -**Category:** Network Discovery & Port Scanning +**Tool ID:** `nmap` +**Binary:** `nmap` (+ `python-nmap` wrapper) +**Category:** Network Discovery & Port Scanning ##### Purpose Nmap performs host discovery, port enumeration, service version detection, and OS fingerprinting. It's the industry standard for network reconnaissance and forms the foundation of most penetration testing engagements. @@ -224,9 +224,9 @@ Nmap performs host discovery, port enumeration, service version detection, and O #### 2.2.2 HTTP Inspector -**Tool ID:** `http_inspector` -**Library:** `requests` / `httpx` -**Category:** Web Reconnaissance +**Tool ID:** `http_inspector` +**Library:** `requests` / `httpx` +**Category:** Web Reconnaissance ##### Purpose Performs safe, read-only HTTP requests to validate endpoint availability, examine response headers, trace redirections, and inspect TLS configurations. Ideal for initial web target profiling without active exploitation attempts. @@ -307,9 +307,9 @@ Performs safe, read-only HTTP requests to validate endpoint availability, examin #### 2.2.3 Directory Discovery -**Tool ID:** `dir_brute` -**Engine:** Custom Python (asyncio + httpx) -**Category:** Web Enumeration +**Tool ID:** `dir_brute` +**Engine:** Custom Python (asyncio + httpx) +**Category:** Web Enumeration ##### Purpose Discovers hidden directories, files, and endpoints by testing common naming patterns against a target web application. Uses wordlists to systematically probe for unlinked resources that may contain sensitive information or administrative interfaces. @@ -394,9 +394,9 @@ Discovers hidden directories, files, and endpoints by testing common naming patt #### 2.2.4 Nikto/Wapiti (Web Passive Scanner) -**Tool ID:** `web_passive_scan` -**Binary:** `nikto` / `wapiti` -**Category:** Web Vulnerability Assessment +**Tool ID:** `web_passive_scan` +**Binary:** `nikto` / `wapiti` +**Category:** Web Vulnerability Assessment ##### Purpose Automated scanner for common web server misconfigurations, outdated software versions, dangerous HTTP methods, missing security headers, and known vulnerabilities. Operates in two modes: passive (read-only) and active (includes low-risk probes). @@ -479,9 +479,9 @@ Automated scanner for common web server misconfigurations, outdated software ver #### 2.2.5 TLS / Certificate Inspector -**Tool ID:** `tls_inspect` -**Library:** `ssl` / `cryptography` -**Category:** Transport Security Analysis +**Tool ID:** `tls_inspect` +**Library:** `ssl` / `cryptography` +**Category:** Transport Security Analysis ##### Purpose Examines TLS/SSL configurations, certificate validity, cipher suites, and protocol versions. Identifies weak cryptographic implementations, expired certificates, and misconfigured trust chains without performing any active exploitation. @@ -651,8 +651,8 @@ High-risk tools requiring mature safety frameworks: SecuScan runs as a **single-page web application (SPA)** served from a local Python backend. The entire application stack operates on `127.0.0.1`, eliminating network exposure risks. -**Access URL (Frontend):** `http://127.0.0.1:3000` -**Backend API:** `http://127.0.0.1:8000/api/v1` +**Access URL (Frontend):** `http://127.0.0.1:3000` +**Backend API:** `http://127.0.0.1:8000/api/v1` ### 3.2 Visual Layout @@ -1042,7 +1042,7 @@ Plugins can provide Python parser scripts: def parse(raw_output: str, task_config: dict) -> dict: """ Parse Nmap XML output into standardized format. - + Returns: { "summary": ["Found 2 hosts", "12 open ports"], @@ -1051,14 +1051,14 @@ def parse(raw_output: str, task_config: dict) -> dict: } """ import xml.etree.ElementTree as ET - + root = ET.fromstring(raw_output) hosts = [] - + for host in root.findall('.//host'): # ... parsing logic ... hosts.append(host_data) - + return { "summary": [f"Found {len(hosts)} hosts"], "structured": {"hosts": hosts}, @@ -1140,10 +1140,10 @@ plugins/ ### 5.1 API Versioning -**Base URL:** `http://127.0.0.1:8080/api/v1` -**Protocol:** REST over HTTP -**Serialization:** JSON -**Authentication:** Token-based (optional, disabled by default) +**Base URL:** `http://127.0.0.1:8080/api/v1` +**Protocol:** REST over HTTP +**Serialization:** JSON +**Authentication:** Token-based (optional, disabled by default) ### 5.2 Endpoint Reference @@ -1625,12 +1625,12 @@ All plugins produce output conforming to a standardized schema, enabling consist "duration_seconds": "float", "status": "completed|failed|cancelled", "exit_code": "integer (null if N/A)", - + "summary": [ "Human-readable summary line 1", "Human-readable summary line 2" ], - + "severity_counts": { "critical": "integer", "high": "integer", @@ -1638,14 +1638,14 @@ All plugins produce output conforming to a standardized schema, enabling consist "low": "integer", "info": "integer" }, - + "structured": { "tool-specific-key": "tool-specific-value" }, - + "raw_output_path": "string (file path)", "raw_output_excerpt": "string (first 1000 chars, optional)", - + "errors": [ { "code": "string", @@ -1653,7 +1653,7 @@ All plugins produce output conforming to a standardized schema, enabling consist "timestamp": "ISO8601" } ], - + "metadata": { "inputs": {"target": "...", "preset": "..."}, "environment": { @@ -1999,10 +1999,10 @@ Findings are categorized using industry-standard severity levels: ### 7.1 Database Technology -**Engine:** SQLite 3.35+ -**Location:** `$HOME/.secuscan/secuscan.db` -**Encryption:** Optional (SQLCipher extension) -**Backup:** Automatic daily snapshots to `$HOME/.secuscan/backups/` +**Engine:** SQLite 3.35+ +**Location:** `$HOME/.secuscan/secuscan.db` +**Encryption:** Optional (SQLCipher extension) +**Backup:** Automatic daily snapshots to `$HOME/.secuscan/backups/` ### 7.2 Database Schema @@ -2018,26 +2018,26 @@ CREATE TABLE tasks ( target TEXT NOT NULL, inputs_json TEXT NOT NULL, -- JSON string of input parameters preset TEXT, - + status TEXT NOT NULL, -- queued|running|completed|failed|cancelled consent_granted BOOLEAN NOT NULL DEFAULT 0, safe_mode BOOLEAN NOT NULL DEFAULT 1, - + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, started_at DATETIME, completed_at DATETIME, duration_seconds REAL, - + exit_code INTEGER, structured_json TEXT, -- Parsed output in standard format raw_output_path TEXT, error_message TEXT, - + -- Resource tracking container_id TEXT, cpu_seconds REAL, memory_peak_mb REAL, - + -- Indexes FOREIGN KEY (plugin_id) REFERENCES plugins(id) ); @@ -2059,15 +2059,15 @@ CREATE TABLE plugins ( version TEXT NOT NULL, category TEXT NOT NULL, metadata_json TEXT NOT NULL, -- Full plugin metadata - + enabled BOOLEAN NOT NULL DEFAULT 1, checksum TEXT, -- SHA-256 of metadata file signature TEXT, -- GPG signature (optional) - + installed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated DATETIME, last_used DATETIME, - + -- Dependency tracking binary_path TEXT, docker_image TEXT, @@ -2091,7 +2091,7 @@ CREATE TABLE settings ( ); -- Example rows: -INSERT INTO settings VALUES +INSERT INTO settings VALUES ('bind_address', '127.0.0.1', 'string', 'Server bind address', CURRENT_TIMESTAMP), ('bind_port', '8080', 'integer', 'Server port', CURRENT_TIMESTAMP), ('require_consent', '1', 'boolean', 'Force consent checkbox', CURRENT_TIMESTAMP), @@ -2108,13 +2108,13 @@ CREATE TABLE audit_log ( timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, event_type TEXT NOT NULL, -- task_start|task_complete|consent_granted|setting_change|auth_attempt severity TEXT NOT NULL, -- info|warning|error - + user_id TEXT, -- If authentication enabled ip_address TEXT, - + message TEXT NOT NULL, context_json TEXT, -- Additional structured data - + task_id TEXT, -- Link to task if applicable plugin_id TEXT ); @@ -2134,11 +2134,11 @@ CREATE TABLE presets ( name TEXT NOT NULL, plugin_id TEXT NOT NULL, config_json TEXT NOT NULL, - + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, last_used DATETIME, use_count INTEGER DEFAULT 0, - + FOREIGN KEY (plugin_id) REFERENCES plugins(id), UNIQUE(plugin_id, name) ); @@ -2381,14 +2381,14 @@ def run_sandboxed(command, timeout=300): "timeout", str(timeout), *command ] - + result = subprocess.run( sandbox_command, capture_output=True, text=True, check=False ) - + return result ``` @@ -2479,30 +2479,30 @@ ALLOWED_PRIVATE = [ def validate_target(target: str, safe_mode: bool = True) -> tuple[bool, str]: """ Validate scan target address. - + Returns: (is_valid, error_message) """ try: ip = ipaddress.ip_address(target) - + # Check blocked networks if any(ip in net for net in BLOCKED_NETWORKS): return False, "Target is in blocked network range" - + # Safe mode: only allow private IPs if safe_mode and not any(ip in net for net in ALLOWED_PRIVATE): return False, "Public IPs not allowed in safe mode" - + return True, "" - + except ValueError: # Not an IP, check if hostname if not re.match(r'^[a-zA-Z0-9.-]+$', target): return False, "Invalid hostname format" - + if target.endswith(('.mil', '.gov')) and safe_mode: return False, "Government domains blocked in safe mode" - + return True, "" ``` @@ -2537,7 +2537,7 @@ def build_command(template: list, inputs: dict) -> list: command.append(str(value)) else: command.append(token) - + return command # Return as list, not shell string ``` @@ -2562,23 +2562,23 @@ from datetime import datetime, timedelta class RateLimiter: def __init__(self): self.task_history = defaultdict(list) # plugin_id -> [timestamps] - + def can_execute(self, plugin_id: str, max_per_hour: int) -> tuple[bool, str]: """Check if plugin can be executed based on rate limits.""" now = datetime.now() hour_ago = now - timedelta(hours=1) - + # Clean old entries self.task_history[plugin_id] = [ ts for ts in self.task_history[plugin_id] if ts > hour_ago ] - + recent_count = len(self.task_history[plugin_id]) - + if recent_count >= max_per_hour: return False, f"Rate limit exceeded: {recent_count}/{max_per_hour} per hour" - + self.task_history[plugin_id].append(now) return True, "" ``` @@ -2597,7 +2597,7 @@ class CredentialVault: self.vault_path = vault_path self.key = self._load_or_generate_key() self.cipher = Fernet(self.key) - + def store(self, name: str, value: str): """Encrypt and store credential.""" encrypted = self.cipher.encrypt(value.encode()) @@ -2606,7 +2606,7 @@ class CredentialVault: "INSERT OR REPLACE INTO credentials VALUES (?, ?)", (name, encrypted) ) - + def retrieve(self, name: str) -> str: """Decrypt and retrieve credential.""" encrypted = db.query("SELECT value FROM credentials WHERE name = ?", name) @@ -2873,7 +2873,7 @@ Scan authorized by: [User confirmation recorded] 3. KEY FINDINGS [Table with color-coded severity] - + 4. DETAILED RESULTS [Port-by-port breakdown with service details] @@ -3265,7 +3265,7 @@ def test_nmap_scan_workflow(api_client): }) assert response.status_code == 200 task_id = response.json()["task_id"] - + # Poll until complete for _ in range(30): status_response = requests.get(f"{api_client}/task/{task_id}/status") @@ -3273,13 +3273,13 @@ def test_nmap_scan_workflow(api_client): if status == "completed": break time.sleep(1) - + assert status == "completed" - + # Retrieve results result_response = requests.get(f"{api_client}/task/{task_id}/result") result = result_response.json() - + assert "structured" in result assert "hosts" in result["structured"] assert len(result["structured"]["hosts"]) > 0 @@ -3301,35 +3301,35 @@ from selenium.webdriver.support import expected_conditions as EC def test_first_scan_workflow(): driver = webdriver.Chrome() driver.get("http://127.0.0.1:8080") - + # Accept terms consent_checkbox = driver.find_element(By.ID, "consent-checkbox") consent_checkbox.click() - + # Select tool driver.find_element(By.LINK_TEXT, "Nmap").click() - + # Fill form target_input = driver.find_element(By.ID, "target") target_input.send_keys("scanme.nmap.org") - + # Select preset preset_select = driver.find_element(By.ID, "preset") preset_select.send_keys("Quick Host Check") - + # Start scan start_button = driver.find_element(By.ID, "start-scan") start_button.click() - + # Wait for completion WebDriverWait(driver, 60).until( EC.text_to_be_present_in_element((By.ID, "task-status"), "Completed") ) - + # Verify results displayed results_panel = driver.find_element(By.ID, "structured-results") assert "open ports" in results_panel.text.lower() - + driver.quit() ``` diff --git a/docs/plugin-validation.md b/docs/plugin-validation.md index 49b8de38..31e1b9aa 100644 --- a/docs/plugin-validation.md +++ b/docs/plugin-validation.md @@ -150,4 +150,4 @@ Existing plugins using a raw `pattern` continue to work without changes: ## Backwards compatibility -Plugins that already define `validation.pattern` (without `validation_type`) continue to work exactly as before. No migration is required. \ No newline at end of file +Plugins that already define `validation.pattern` (without `validation_type`) continue to work exactly as before. No migration is required. diff --git a/frontend/e2e/scan-workflow.spec.ts b/frontend/e2e/scan-workflow.spec.ts index a9f6c818..625c5705 100644 --- a/frontend/e2e/scan-workflow.spec.ts +++ b/frontend/e2e/scan-workflow.spec.ts @@ -205,4 +205,4 @@ test.describe("Scan workflow - consent required", () => { await expect(consentCheckbox).toBeChecked(); await expect(page.getByRole("button", { name: /initiate_scan/i })).not.toBeDisabled(); }); -}); \ No newline at end of file +}); diff --git a/frontend/index.html b/frontend/index.html index 8ed1c04c..27063558 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -12,4 +12,3 @@ - diff --git a/frontend/playwright-report/index.html b/frontend/playwright-report/index.html index a185633a..0e9d6b0c 100644 --- a/frontend/playwright-report/index.html +++ b/frontend/playwright-report/index.html @@ -82,4 +82,4 @@
- \ No newline at end of file + diff --git a/frontend/src/api.ts b/frontend/src/api.ts index 7c7fc0e0..bd08a104 100644 --- a/frontend/src/api.ts +++ b/frontend/src/api.ts @@ -240,4 +240,4 @@ export function deleteWorkflow(workflowId: string): Promise<{ deleted: boolean } return request<{ deleted: boolean }>(`/workflows/${workflowId}`, { method: 'DELETE', }) -} \ No newline at end of file +} diff --git a/frontend/src/components/AppShell.tsx b/frontend/src/components/AppShell.tsx index 1c73e91b..f3f15fac 100644 --- a/frontend/src/components/AppShell.tsx +++ b/frontend/src/components/AppShell.tsx @@ -104,7 +104,7 @@ export default function AppShell({ children }: AppShellProps) { )} -
diff --git a/frontend/src/components/ExecutiveStatsBar.tsx b/frontend/src/components/ExecutiveStatsBar.tsx index 35d6263d..ad1d2d54 100644 --- a/frontend/src/components/ExecutiveStatsBar.tsx +++ b/frontend/src/components/ExecutiveStatsBar.tsx @@ -30,8 +30,8 @@ export const ExecutiveStatsBar: React.FC = ({
Status Profile
- {riskLabel || 'Moderate'} diff --git a/frontend/src/components/I18nContext.tsx b/frontend/src/components/I18nContext.tsx index d879669f..ec134b51 100644 --- a/frontend/src/components/I18nContext.tsx +++ b/frontend/src/components/I18nContext.tsx @@ -40,7 +40,7 @@ export function I18nProvider({ children }: { children: ReactNode }) { const t = (path: string): string => { const keys = path.split('.'); let result: any = translations[locale]; - + for (const key of keys) { if (result && typeof result === 'object' && key in result) { result = result[key]; @@ -48,7 +48,7 @@ export function I18nProvider({ children }: { children: ReactNode }) { return path; } } - + return typeof result === 'string' ? result : path; }; diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 8853ca4f..e83e4bd4 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -13,15 +13,15 @@ interface NavItemProps { const NavItem = ({ to, icon, label, isExpanded, highlight = false }: NavItemProps) => { return ( - e.stopPropagation()} className={({ isActive }) => ` relative flex items-center transition-all duration-300 group ${isExpanded ? 'gap-3 px-5 py-2.5 mx-2 rounded-lg' : 'justify-center py-3 px-2 mx-2 rounded-lg'} - ${isActive - ? 'bg-accent-silver/10 text-primary shadow-[inset_0_1px_1px_rgba(255,255,255,0.05)]' + ${isActive + ? 'bg-accent-silver/10 text-primary shadow-[inset_0_1px_1px_rgba(255,255,255,0.05)]' : highlight ? 'bg-rag-blue/15 border border-rag-blue/30 text-silver-bright hover:bg-rag-blue/25' : 'text-secondary hover:text-primary hover:bg-accent-silver/5'} @@ -32,24 +32,24 @@ const NavItem = ({ to, icon, label, isExpanded, highlight = false }: NavItemProp <> {/* Active Indicator Glow */} {isActive && ( - )} - + {/* Active Side bar */} {isActive && ( - )} - + {icon} - + {isExpanded && ( - ( {isExpanded ? ( - setIsExpanded(!isExpanded)} className={` - hidden lg:flex flex-col h-screen fixed left-0 top-0 bg-secondary border-r border-accent-silver/10 z-50 + hidden lg:flex flex-col h-screen fixed left-0 top-0 bg-secondary border-r border-accent-silver/10 z-50 shadow-[4px_0_24px_rgba(0,0,0,0.4)] overflow-hidden cursor-pointer `} > {/* Header / Logo */}
- { @@ -137,10 +137,10 @@ export default function Sidebar() { > shield - + {isExpanded && ( - -
) -} \ No newline at end of file +} diff --git a/frontend/src/pages/Settings.tsx b/frontend/src/pages/Settings.tsx index 14061158..a4dbb513 100644 --- a/frontend/src/pages/Settings.tsx +++ b/frontend/src/pages/Settings.tsx @@ -5,8 +5,8 @@ import { useToast } from '../components/ToastContext' const itemVariants = { hidden: { opacity: 0, y: 20 }, - visible: { - opacity: 1, + visible: { + opacity: 1, y: 0, transition: { type: 'spring', stiffness: 200, damping: 25 } } @@ -34,7 +34,7 @@ const DEFAULT_CONFIG = { export default function Settings() { const { theme, setTheme } = useTheme() const { addToast } = useToast() - + const [config, setConfig] = useState(() => { const saved = localStorage.getItem('secuscan-config') if (saved) { @@ -90,7 +90,7 @@ export default function Settings() {

{description}

- onChange(type === 'number' ? parseInt(e.target.value) || 0 : e.target.value)} @@ -106,7 +106,7 @@ export default function Settings() {

{description}

-