diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 3b45fbbe..48d7f03f 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -74,7 +74,7 @@ async def _broadcast(self, task_id: str, event_type: str, data: Any): event = {"type": event_type, "data": data} for q in self._listeners[task_id]: await q.put(event) - + async def create_task( self, plugin_id: str, @@ -84,29 +84,29 @@ async def create_task( ) -> str: """ Create a new scan task. - + Args: plugin_id: Plugin identifier inputs: User input values preset: Optional preset name consent_granted: Whether user granted consent - + Returns: Task ID """ task_id = str(uuid.uuid4()) plugin_manager = get_plugin_manager() plugin = plugin_manager.get_plugin(plugin_id) - + if not plugin: raise ValueError(f"Plugin not found: {plugin_id}") - + # Apply preset if provided if preset and preset in plugin.presets: preset_values = plugin.presets[preset] # Merge preset with user inputs (user inputs take precedence) inputs = {**preset_values, **inputs} - + # Store task in database db = await get_db() await db.execute( @@ -128,7 +128,7 @@ async def create_task( inputs.get("safe_mode", True) ) ) - + # Log audit event await db.log_audit( "task_created", @@ -137,9 +137,9 @@ async def create_task( task_id=task_id, plugin_id=plugin_id ) - + return task_id - + async def mark_task_failed(self, task_id: str, reason: str) -> None: """ Mark a task as failed without running it. @@ -178,7 +178,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None: async def execute_task(self, task_id: str): """ Execute a task asynchronously. - + Args: task_id: Task identifier """ @@ -211,18 +211,18 @@ async def execute_task(self, task_id: str): if plugin_id in MODULAR_SCANNERS: scanner_class = MODULAR_SCANNERS[plugin_id] scanner = scanner_class(task_id, db) - + logger.info(f"Executing modular scanner {plugin_id} for task {task_id}") await self._broadcast(task_id, "status", TaskStatus.RUNNING.value) - + start_time = time.time() # Run the scanner result = await scanner.run(target, inputs) duration = time.time() - start_time - + # Update task with results final_status = TaskStatus.COMPLETED.value if result.get("status") != "failed" else TaskStatus.FAILED.value - + await db.execute( """ UPDATE tasks SET @@ -262,7 +262,7 @@ async def execute_task(self, task_id: str): raise ValueError(f"Plugin not found: {plugin_id}") # Pending records for assets removed - + command = plugin_manager.build_command(plugin_id, inputs) if not command: @@ -425,7 +425,7 @@ async def execute_task(self, task_id: str): # release the concurrency slot regardless of how the task ended. self.running_tasks.pop(task_id, None) await concurrent_limiter.release(task_id) - + async def _execute_command( self, command: list, @@ -456,7 +456,7 @@ async def read_stream(): stdout = process.stdout if stdout is None: return - + while not stdout.at_eof(): line = await stdout.readline() if line: @@ -583,13 +583,13 @@ async def cancel_task(self, task_id: str) -> bool: ) return True - + async def get_task_status(self, task_id: str) -> Optional[Dict]: """Get task status and progress""" db = await get_db() task_row = await db.fetchone( """ - SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at, + SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at, duration_seconds, exit_code, error_message, preset, inputs_json FROM tasks WHERE id = ? """, @@ -632,7 +632,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: """Persist derived findings and report records into SQLite.""" parsed = self._parse_results(plugin, output) findings_data = parsed.get("findings", []) - + # Update task with structured results await db.execute( "UPDATE tasks SET structured_json = ? WHERE id = ?", @@ -692,7 +692,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]): """Persist modular scanner results into findings, and reports.""" findings_data = result.get("findings", []) - + # Insert findings for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") @@ -748,36 +748,42 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: """Route to appropriate parser based on plugin metadata.""" parser_type = plugin.output.get("parser") parser_input = self._resolve_parser_input(plugin, output) - + # 1. Check for custom parser.py in plugin directory (Recommended) plugin_manager = get_plugin_manager() plugin_dir = plugin_manager.plugins_dir / plugin.id parser_path = plugin_dir / "parser.py" - + if parser_path.exists(): - try: - import importlib.util - spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) - if spec is not None: - loader = spec.loader - if loader is not None: - module = importlib.util.module_from_spec(spec) - loader.exec_module(module) - if hasattr(module, "parse"): - logger.info(f"Using custom parser for {plugin.id}") - parsed = module.parse(parser_input) - return self._normalize_parsed_result(plugin, parser_input, parsed) - else: - logger.warning(f"Custom parser {parser_path} missing 'parse' function") - except Exception as e: - logger.error(f"Error executing custom parser for {plugin.id}: {e}") + if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir): + logger.error( + "Skipping custom parser for plugin %s: integrity check failed at exec time", + plugin.id, + ) + else: + try: + import importlib.util + spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) + if spec is not None: + loader = spec.loader + if loader is not None: + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + if hasattr(module, "parse"): + logger.info(f"Using custom parser for {plugin.id}") + parsed = module.parse(parser_input) + return self._normalize_parsed_result(plugin, parser_input, parsed) + else: + logger.warning(f"Custom parser {parser_path} missing 'parse' function") + except Exception as e: + logger.error(f"Error executing custom parser for {plugin.id}: {e}") # 2. Fallback to legacy built-in parsers if parser_type == "builtin_nmap": return self._normalize_parsed_result(plugin, parser_input, self._parse_nmap_output(parser_input)) elif parser_type == "builtin_http": return self._normalize_parsed_result(plugin, parser_input, self._parse_http_output(parser_input)) - + return self._normalize_parsed_result(plugin, parser_input, {"findings": [], "raw": parser_input}) def _resolve_parser_input(self, plugin, output: str) -> str: @@ -936,7 +942,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: findings = [] ports = [] services = [] - + # Regex for open ports: 80/tcp open http port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)") for match in port_pattern.finditer(output): @@ -952,7 +958,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: "remediation": "Close unnecessary ports and use a firewall to restrict access.", "metadata": {"port": port_str, "protocol": proto, "service": service} }) - + return { "open_ports": sorted(list(set(ports))), "services": sorted(list(set(services))), diff --git a/backend/secuscan/plugins.py b/backend/secuscan/plugins.py index d224bd4a..45285a3d 100644 --- a/backend/secuscan/plugins.py +++ b/backend/secuscan/plugins.py @@ -173,6 +173,55 @@ def compute_plugin_digest(metadata_file: Path, parser_file: Path) -> str: return hashlib.sha256(f"{metadata_digest}:{parser_digest}".encode("utf-8")).hexdigest() + def verify_parser_at_exec_time( + self, plugin: PluginMetadata, plugin_dir: Path + ) -> bool: + """Re-verify plugin digest immediately before executing parser.py. + + This closes the TOCTOU window between startup integrity check and + actual code execution: the file could be replaced on disk after the + initial load-time validation. + + Returns True when execution should proceed, False when it must be + blocked. + """ + metadata_file = plugin_dir / "metadata.json" + parser_file = plugin_dir / "parser.py" + + if not plugin.checksum: + if settings.enforce_plugin_signatures: + logger.error( + "Refusing to execute parser for plugin %s: no checksum present " + "and signature enforcement is enabled", + plugin.id, + ) + return False + logger.warning( + "Executing unverified parser for plugin %s: checksum not set", + plugin.id, + ) + return True + + try: + current_digest = self.compute_plugin_digest(metadata_file, parser_file) + except Exception as exc: + logger.error( + "Failed to compute digest for plugin %s at exec time: %s", + plugin.id, + exc, + ) + return False + + if not hmac.compare_digest(current_digest, plugin.checksum): + logger.error( + "SECURITY: Parser integrity check failed for plugin %s — " + "parser.py may have been tampered with after startup", + plugin.id, + ) + return False + + return True + def get_plugin(self, plugin_id: str) -> Optional[PluginMetadata]: """Get plugin by ID""" return self.plugins.get(plugin_id) diff --git a/testing/backend/unit/test_plugin_parser_rce.py b/testing/backend/unit/test_plugin_parser_rce.py new file mode 100644 index 00000000..f4376e84 --- /dev/null +++ b/testing/backend/unit/test_plugin_parser_rce.py @@ -0,0 +1,253 @@ +""" +Security tests for plugin parser RCE prevention (issue #202). + +Verifies that: +- verify_parser_at_exec_time() re-checks the digest before exec_module +- A parser.py modified after load-time validation is rejected +- A parser.py with no checksum is allowed (with warning) when enforcement is off +- A parser.py with no checksum is blocked when enforce_plugin_signatures is on +- A parser.py where the checksum matches is allowed +- Executor skips exec_module when verify_parser_at_exec_time returns False +""" + +import hashlib +import json +import textwrap +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from backend.secuscan.plugins import PluginManager +from backend.secuscan.config import settings + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write_plugin(tmp_path: Path, parser_src: str = "", include_checksum: bool = True): + """Create a minimal plugin directory with metadata.json and optional parser.py.""" + plugin_dir = tmp_path / "test-rce-plugin" + plugin_dir.mkdir() + + metadata = { + "id": "test-rce-plugin", + "name": "Test RCE Plugin", + "version": "1.0.0", + "description": "test", + "category": "test", + "engine": {"type": "cli", "binary": "echo"}, + "command_template": ["{target}"], + "safety": {"level": "safe"}, + "output": {"format": "text", "parser": "custom"}, + "fields": [{"id": "target", "label": "Target", "type": "string"}], + "presets": {}, + } + + metadata_file = plugin_dir / "metadata.json" + + if parser_src: + parser_file = plugin_dir / "parser.py" + parser_file.write_text(parser_src, encoding="utf-8") + else: + parser_file = plugin_dir / "parser.py" # may not exist yet + + if include_checksum: + # Write metadata without checksum first, compute digest, then add checksum + metadata_file.write_text(json.dumps(metadata, sort_keys=True), encoding="utf-8") + digest = PluginManager.compute_plugin_digest(metadata_file, parser_file) + metadata["checksum"] = digest + + metadata_file.write_text(json.dumps(metadata, sort_keys=True), encoding="utf-8") + return plugin_dir, metadata_file + + +def _make_manager(tmp_path: Path) -> PluginManager: + return PluginManager(plugins_dir=str(tmp_path)) + + +def _minimal_plugin_meta(checksum: str = None): + """Return a PluginMetadata-like object for unit tests.""" + from backend.secuscan.models import PluginMetadata + data = { + "id": "test-rce-plugin", + "name": "Test", + "version": "1.0.0", + "description": "test", + "category": "test", + "engine": {"type": "cli", "binary": "echo"}, + "command_template": [], + "safety": {"level": "safe"}, + "output": {"format": "text", "parser": "custom"}, + "fields": [], + "presets": {}, + } + if checksum: + data["checksum"] = checksum + return PluginMetadata(**data) + + +# --------------------------------------------------------------------------- +# verify_parser_at_exec_time — unit tests +# --------------------------------------------------------------------------- + +class TestVerifyParserAtExecTime: + def test_matching_checksum_returns_true(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + parser_src = "def parse(output): return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + + mgr = _make_manager(tmp_path) + # Load the checksum from the written metadata + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is True + + def test_tampered_parser_is_rejected(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + parser_src = "def parse(output): return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + + # Record checksum with original parser + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + # Tamper with parser.py after checksum was recorded + (plugin_dir / "parser.py").write_text( + "import os; os.system('id')\ndef parse(output): return {'findings': []}\n", + encoding="utf-8", + ) + + mgr = _make_manager(tmp_path) + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is False + + def test_no_checksum_allowed_when_enforcement_off(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + plugin_dir, _ = _write_plugin(tmp_path, parser_src="", include_checksum=False) + plugin = _minimal_plugin_meta(checksum=None) + + mgr = _make_manager(tmp_path) + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is True + + def test_no_checksum_blocked_when_enforcement_on(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", True) + plugin_dir, _ = _write_plugin(tmp_path, parser_src="", include_checksum=False) + plugin = _minimal_plugin_meta(checksum=None) + + mgr = _make_manager(tmp_path) + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is False + + def test_digest_compute_failure_returns_false(self, tmp_path, monkeypatch): + """If the digest computation raises (e.g. permissions), reject execution.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + plugin_dir, _ = _write_plugin(tmp_path, parser_src="", include_checksum=True) + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata.get("checksum", "abc")) + + mgr = _make_manager(tmp_path) + with patch.object( + PluginManager, "compute_plugin_digest", side_effect=OSError("permission denied") + ): + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is False + + +# --------------------------------------------------------------------------- +# Executor integration — exec_module is NOT called when check fails +# --------------------------------------------------------------------------- + +class TestExecutorParserGate: + def test_exec_module_skipped_when_integrity_fails(self, tmp_path, monkeypatch): + """When verify_parser_at_exec_time returns False the parser must not run.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + + parser_src = "def parse(output): return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + + # Tamper with parser.py so the digest no longer matches + (plugin_dir / "parser.py").write_text( + "import sys; sys.exit(99)\ndef parse(output): return {}\n", + encoding="utf-8", + ) + + from backend.secuscan.models import PluginMetadata + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + mgr = _make_manager(tmp_path) + mgr.plugins_dir = tmp_path + mgr.plugins[plugin.id] = plugin + + exec_called = [] + + original_exec = __builtins__.__class__.__dict__ # not used + import importlib.util as ilu + + original_exec_module = None + + class _CapturingLoader: + def exec_module(self, module): + exec_called.append(True) + + with patch("importlib.util.spec_from_file_location") as mock_spec: + mock_loader = MagicMock() + mock_loader.exec_module = MagicMock(side_effect=lambda m: exec_called.append(True)) + mock_spec_obj = MagicMock() + mock_spec_obj.loader = mock_loader + mock_spec.return_value = mock_spec_obj + + with patch("importlib.util.module_from_spec", return_value=MagicMock()): + # Import executor lazily to avoid circular-import issues at module level + from backend.secuscan import executor as executor_module + + plugin_obj = plugin + # Directly call _parse_results, patching plugin_manager + exec_instance = executor_module.TaskExecutor.__new__(executor_module.TaskExecutor) + + with patch( + "backend.secuscan.executor.get_plugin_manager", return_value=mgr + ): + result = exec_instance._parse_results(plugin_obj, "raw output") + + assert len(exec_called) == 0, "exec_module must not be called when integrity check fails" + + def test_exec_module_called_when_integrity_passes(self, tmp_path, monkeypatch): + """When verify_parser_at_exec_time returns True, exec_module must run.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + + parser_src = "def parse(output):\n return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + mgr = _make_manager(tmp_path) + mgr.plugins_dir = tmp_path + mgr.plugins[plugin.id] = plugin + + exec_called = [] + + def _fake_exec(module): + exec_called.append(True) + module.parse = lambda output: {"findings": []} + + with patch("importlib.util.spec_from_file_location") as mock_spec: + mock_loader = MagicMock() + mock_loader.exec_module = MagicMock(side_effect=_fake_exec) + mock_spec_obj = MagicMock() + mock_spec_obj.loader = mock_loader + mock_spec.return_value = mock_spec_obj + + fake_module = MagicMock() + fake_module.parse = lambda output: {"findings": []} + + with patch("importlib.util.module_from_spec", return_value=fake_module): + from backend.secuscan import executor as executor_module + exec_instance = executor_module.TaskExecutor.__new__(executor_module.TaskExecutor) + + with patch( + "backend.secuscan.executor.get_plugin_manager", return_value=mgr + ): + result = exec_instance._parse_results(plugin, "raw output") + + assert len(exec_called) == 1, "exec_module must be called once when integrity check passes"