From 7a566dfdc4ae10c3269d247ba02ffc8ac3afc47d Mon Sep 17 00:00:00 2001 From: Vexx Date: Sun, 24 May 2026 02:29:24 +0530 Subject: [PATCH 1/2] fix(executor): re-verify parser.py integrity immediately before exec_module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The executor previously ran parser.py after only verifying it at plugin load time (startup). An attacker who could write to the plugin directory after startup could replace parser.py between the load-time check and the actual execution — a TOCTOU vulnerability leading to arbitrary code execution inside the server process. Changes: - plugins.py: add verify_parser_at_exec_time() — re-computes the plugin digest and compares it against the stored checksum using hmac.compare_digest immediately before exec_module is called; blocks execution if checksum is absent and enforce_plugin_signatures is on - executor.py: call verify_parser_at_exec_time() in _parse_results() before the importlib exec_module block; parser is skipped on failure - test_plugin_parser_rce.py: 7 new tests covering: checksum match allows, tampered parser blocked, no-checksum with enforcement on blocked, digest compute failure returns safe default, executor skips exec_module when check fails, executor calls exec_module when check passes Fixes #202 (plugin parser.py dynamically executed without runtime signature enforcement — RCE via TOCTOU). --- backend/secuscan/executor.py | 38 +-- backend/secuscan/plugins.py | 49 ++++ .../backend/unit/test_plugin_parser_rce.py | 253 ++++++++++++++++++ 3 files changed, 324 insertions(+), 16 deletions(-) create mode 100644 testing/backend/unit/test_plugin_parser_rce.py diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 3b45fbbe..43b97bdb 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -755,22 +755,28 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: parser_path = plugin_dir / "parser.py" if parser_path.exists(): - try: - import importlib.util - spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) - if spec is not None: - loader = spec.loader - if loader is not None: - module = importlib.util.module_from_spec(spec) - loader.exec_module(module) - if hasattr(module, "parse"): - logger.info(f"Using custom parser for {plugin.id}") - parsed = module.parse(parser_input) - return self._normalize_parsed_result(plugin, parser_input, parsed) - else: - logger.warning(f"Custom parser {parser_path} missing 'parse' function") - except Exception as e: - logger.error(f"Error executing custom parser for {plugin.id}: {e}") + if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir): + logger.error( + "Skipping custom parser for plugin %s: integrity check failed at exec time", + plugin.id, + ) + else: + try: + import importlib.util + spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) + if spec is not None: + loader = spec.loader + if loader is not None: + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + if hasattr(module, "parse"): + logger.info(f"Using custom parser for {plugin.id}") + parsed = module.parse(parser_input) + return self._normalize_parsed_result(plugin, parser_input, parsed) + else: + logger.warning(f"Custom parser {parser_path} missing 'parse' function") + except Exception as e: + logger.error(f"Error executing custom parser for {plugin.id}: {e}") # 2. Fallback to legacy built-in parsers if parser_type == "builtin_nmap": diff --git a/backend/secuscan/plugins.py b/backend/secuscan/plugins.py index 4af73a3c..09cfa463 100644 --- a/backend/secuscan/plugins.py +++ b/backend/secuscan/plugins.py @@ -167,6 +167,55 @@ def compute_plugin_digest(metadata_file: Path, parser_file: Path) -> str: parser_digest = hashlib.sha256(parser_file.read_bytes()).hexdigest() if parser_file.exists() else "" return hashlib.sha256(f"{metadata_digest}:{parser_digest}".encode("utf-8")).hexdigest() + def verify_parser_at_exec_time( + self, plugin: PluginMetadata, plugin_dir: Path + ) -> bool: + """Re-verify plugin digest immediately before executing parser.py. + + This closes the TOCTOU window between startup integrity check and + actual code execution: the file could be replaced on disk after the + initial load-time validation. + + Returns True when execution should proceed, False when it must be + blocked. + """ + metadata_file = plugin_dir / "metadata.json" + parser_file = plugin_dir / "parser.py" + + if not plugin.checksum: + if settings.enforce_plugin_signatures: + logger.error( + "Refusing to execute parser for plugin %s: no checksum present " + "and signature enforcement is enabled", + plugin.id, + ) + return False + logger.warning( + "Executing unverified parser for plugin %s: checksum not set", + plugin.id, + ) + return True + + try: + current_digest = self.compute_plugin_digest(metadata_file, parser_file) + except Exception as exc: + logger.error( + "Failed to compute digest for plugin %s at exec time: %s", + plugin.id, + exc, + ) + return False + + if not hmac.compare_digest(current_digest, plugin.checksum): + logger.error( + "SECURITY: Parser integrity check failed for plugin %s — " + "parser.py may have been tampered with after startup", + plugin.id, + ) + return False + + return True + def get_plugin(self, plugin_id: str) -> Optional[PluginMetadata]: """Get plugin by ID""" return self.plugins.get(plugin_id) diff --git a/testing/backend/unit/test_plugin_parser_rce.py b/testing/backend/unit/test_plugin_parser_rce.py new file mode 100644 index 00000000..f4376e84 --- /dev/null +++ b/testing/backend/unit/test_plugin_parser_rce.py @@ -0,0 +1,253 @@ +""" +Security tests for plugin parser RCE prevention (issue #202). + +Verifies that: +- verify_parser_at_exec_time() re-checks the digest before exec_module +- A parser.py modified after load-time validation is rejected +- A parser.py with no checksum is allowed (with warning) when enforcement is off +- A parser.py with no checksum is blocked when enforce_plugin_signatures is on +- A parser.py where the checksum matches is allowed +- Executor skips exec_module when verify_parser_at_exec_time returns False +""" + +import hashlib +import json +import textwrap +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from backend.secuscan.plugins import PluginManager +from backend.secuscan.config import settings + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write_plugin(tmp_path: Path, parser_src: str = "", include_checksum: bool = True): + """Create a minimal plugin directory with metadata.json and optional parser.py.""" + plugin_dir = tmp_path / "test-rce-plugin" + plugin_dir.mkdir() + + metadata = { + "id": "test-rce-plugin", + "name": "Test RCE Plugin", + "version": "1.0.0", + "description": "test", + "category": "test", + "engine": {"type": "cli", "binary": "echo"}, + "command_template": ["{target}"], + "safety": {"level": "safe"}, + "output": {"format": "text", "parser": "custom"}, + "fields": [{"id": "target", "label": "Target", "type": "string"}], + "presets": {}, + } + + metadata_file = plugin_dir / "metadata.json" + + if parser_src: + parser_file = plugin_dir / "parser.py" + parser_file.write_text(parser_src, encoding="utf-8") + else: + parser_file = plugin_dir / "parser.py" # may not exist yet + + if include_checksum: + # Write metadata without checksum first, compute digest, then add checksum + metadata_file.write_text(json.dumps(metadata, sort_keys=True), encoding="utf-8") + digest = PluginManager.compute_plugin_digest(metadata_file, parser_file) + metadata["checksum"] = digest + + metadata_file.write_text(json.dumps(metadata, sort_keys=True), encoding="utf-8") + return plugin_dir, metadata_file + + +def _make_manager(tmp_path: Path) -> PluginManager: + return PluginManager(plugins_dir=str(tmp_path)) + + +def _minimal_plugin_meta(checksum: str = None): + """Return a PluginMetadata-like object for unit tests.""" + from backend.secuscan.models import PluginMetadata + data = { + "id": "test-rce-plugin", + "name": "Test", + "version": "1.0.0", + "description": "test", + "category": "test", + "engine": {"type": "cli", "binary": "echo"}, + "command_template": [], + "safety": {"level": "safe"}, + "output": {"format": "text", "parser": "custom"}, + "fields": [], + "presets": {}, + } + if checksum: + data["checksum"] = checksum + return PluginMetadata(**data) + + +# --------------------------------------------------------------------------- +# verify_parser_at_exec_time — unit tests +# --------------------------------------------------------------------------- + +class TestVerifyParserAtExecTime: + def test_matching_checksum_returns_true(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + parser_src = "def parse(output): return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + + mgr = _make_manager(tmp_path) + # Load the checksum from the written metadata + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is True + + def test_tampered_parser_is_rejected(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + parser_src = "def parse(output): return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + + # Record checksum with original parser + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + # Tamper with parser.py after checksum was recorded + (plugin_dir / "parser.py").write_text( + "import os; os.system('id')\ndef parse(output): return {'findings': []}\n", + encoding="utf-8", + ) + + mgr = _make_manager(tmp_path) + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is False + + def test_no_checksum_allowed_when_enforcement_off(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + plugin_dir, _ = _write_plugin(tmp_path, parser_src="", include_checksum=False) + plugin = _minimal_plugin_meta(checksum=None) + + mgr = _make_manager(tmp_path) + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is True + + def test_no_checksum_blocked_when_enforcement_on(self, tmp_path, monkeypatch): + monkeypatch.setattr(settings, "enforce_plugin_signatures", True) + plugin_dir, _ = _write_plugin(tmp_path, parser_src="", include_checksum=False) + plugin = _minimal_plugin_meta(checksum=None) + + mgr = _make_manager(tmp_path) + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is False + + def test_digest_compute_failure_returns_false(self, tmp_path, monkeypatch): + """If the digest computation raises (e.g. permissions), reject execution.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + plugin_dir, _ = _write_plugin(tmp_path, parser_src="", include_checksum=True) + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata.get("checksum", "abc")) + + mgr = _make_manager(tmp_path) + with patch.object( + PluginManager, "compute_plugin_digest", side_effect=OSError("permission denied") + ): + assert mgr.verify_parser_at_exec_time(plugin, plugin_dir) is False + + +# --------------------------------------------------------------------------- +# Executor integration — exec_module is NOT called when check fails +# --------------------------------------------------------------------------- + +class TestExecutorParserGate: + def test_exec_module_skipped_when_integrity_fails(self, tmp_path, monkeypatch): + """When verify_parser_at_exec_time returns False the parser must not run.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + + parser_src = "def parse(output): return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + + # Tamper with parser.py so the digest no longer matches + (plugin_dir / "parser.py").write_text( + "import sys; sys.exit(99)\ndef parse(output): return {}\n", + encoding="utf-8", + ) + + from backend.secuscan.models import PluginMetadata + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + mgr = _make_manager(tmp_path) + mgr.plugins_dir = tmp_path + mgr.plugins[plugin.id] = plugin + + exec_called = [] + + original_exec = __builtins__.__class__.__dict__ # not used + import importlib.util as ilu + + original_exec_module = None + + class _CapturingLoader: + def exec_module(self, module): + exec_called.append(True) + + with patch("importlib.util.spec_from_file_location") as mock_spec: + mock_loader = MagicMock() + mock_loader.exec_module = MagicMock(side_effect=lambda m: exec_called.append(True)) + mock_spec_obj = MagicMock() + mock_spec_obj.loader = mock_loader + mock_spec.return_value = mock_spec_obj + + with patch("importlib.util.module_from_spec", return_value=MagicMock()): + # Import executor lazily to avoid circular-import issues at module level + from backend.secuscan import executor as executor_module + + plugin_obj = plugin + # Directly call _parse_results, patching plugin_manager + exec_instance = executor_module.TaskExecutor.__new__(executor_module.TaskExecutor) + + with patch( + "backend.secuscan.executor.get_plugin_manager", return_value=mgr + ): + result = exec_instance._parse_results(plugin_obj, "raw output") + + assert len(exec_called) == 0, "exec_module must not be called when integrity check fails" + + def test_exec_module_called_when_integrity_passes(self, tmp_path, monkeypatch): + """When verify_parser_at_exec_time returns True, exec_module must run.""" + monkeypatch.setattr(settings, "enforce_plugin_signatures", False) + + parser_src = "def parse(output):\n return {'findings': []}\n" + plugin_dir, _ = _write_plugin(tmp_path, parser_src=parser_src, include_checksum=True) + metadata = json.loads((plugin_dir / "metadata.json").read_text()) + plugin = _minimal_plugin_meta(checksum=metadata["checksum"]) + + mgr = _make_manager(tmp_path) + mgr.plugins_dir = tmp_path + mgr.plugins[plugin.id] = plugin + + exec_called = [] + + def _fake_exec(module): + exec_called.append(True) + module.parse = lambda output: {"findings": []} + + with patch("importlib.util.spec_from_file_location") as mock_spec: + mock_loader = MagicMock() + mock_loader.exec_module = MagicMock(side_effect=_fake_exec) + mock_spec_obj = MagicMock() + mock_spec_obj.loader = mock_loader + mock_spec.return_value = mock_spec_obj + + fake_module = MagicMock() + fake_module.parse = lambda output: {"findings": []} + + with patch("importlib.util.module_from_spec", return_value=fake_module): + from backend.secuscan import executor as executor_module + exec_instance = executor_module.TaskExecutor.__new__(executor_module.TaskExecutor) + + with patch( + "backend.secuscan.executor.get_plugin_manager", return_value=mgr + ): + result = exec_instance._parse_results(plugin, "raw output") + + assert len(exec_called) == 1, "exec_module must be called once when integrity check passes" From a5df20177227cb2c2fa50850b9fb7f8da26079c3 Mon Sep 17 00:00:00 2001 From: Vexx Date: Sun, 24 May 2026 02:34:58 +0530 Subject: [PATCH 2/2] style: strip trailing whitespace from plugins.py and executor.py --- backend/secuscan/executor.py | 52 ++++++++++++++++++------------------ backend/secuscan/plugins.py | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 43b97bdb..48d7f03f 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -74,7 +74,7 @@ async def _broadcast(self, task_id: str, event_type: str, data: Any): event = {"type": event_type, "data": data} for q in self._listeners[task_id]: await q.put(event) - + async def create_task( self, plugin_id: str, @@ -84,29 +84,29 @@ async def create_task( ) -> str: """ Create a new scan task. - + Args: plugin_id: Plugin identifier inputs: User input values preset: Optional preset name consent_granted: Whether user granted consent - + Returns: Task ID """ task_id = str(uuid.uuid4()) plugin_manager = get_plugin_manager() plugin = plugin_manager.get_plugin(plugin_id) - + if not plugin: raise ValueError(f"Plugin not found: {plugin_id}") - + # Apply preset if provided if preset and preset in plugin.presets: preset_values = plugin.presets[preset] # Merge preset with user inputs (user inputs take precedence) inputs = {**preset_values, **inputs} - + # Store task in database db = await get_db() await db.execute( @@ -128,7 +128,7 @@ async def create_task( inputs.get("safe_mode", True) ) ) - + # Log audit event await db.log_audit( "task_created", @@ -137,9 +137,9 @@ async def create_task( task_id=task_id, plugin_id=plugin_id ) - + return task_id - + async def mark_task_failed(self, task_id: str, reason: str) -> None: """ Mark a task as failed without running it. @@ -178,7 +178,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None: async def execute_task(self, task_id: str): """ Execute a task asynchronously. - + Args: task_id: Task identifier """ @@ -211,18 +211,18 @@ async def execute_task(self, task_id: str): if plugin_id in MODULAR_SCANNERS: scanner_class = MODULAR_SCANNERS[plugin_id] scanner = scanner_class(task_id, db) - + logger.info(f"Executing modular scanner {plugin_id} for task {task_id}") await self._broadcast(task_id, "status", TaskStatus.RUNNING.value) - + start_time = time.time() # Run the scanner result = await scanner.run(target, inputs) duration = time.time() - start_time - + # Update task with results final_status = TaskStatus.COMPLETED.value if result.get("status") != "failed" else TaskStatus.FAILED.value - + await db.execute( """ UPDATE tasks SET @@ -262,7 +262,7 @@ async def execute_task(self, task_id: str): raise ValueError(f"Plugin not found: {plugin_id}") # Pending records for assets removed - + command = plugin_manager.build_command(plugin_id, inputs) if not command: @@ -425,7 +425,7 @@ async def execute_task(self, task_id: str): # release the concurrency slot regardless of how the task ended. self.running_tasks.pop(task_id, None) await concurrent_limiter.release(task_id) - + async def _execute_command( self, command: list, @@ -456,7 +456,7 @@ async def read_stream(): stdout = process.stdout if stdout is None: return - + while not stdout.at_eof(): line = await stdout.readline() if line: @@ -583,13 +583,13 @@ async def cancel_task(self, task_id: str) -> bool: ) return True - + async def get_task_status(self, task_id: str) -> Optional[Dict]: """Get task status and progress""" db = await get_db() task_row = await db.fetchone( """ - SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at, + SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at, duration_seconds, exit_code, error_message, preset, inputs_json FROM tasks WHERE id = ? """, @@ -632,7 +632,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: """Persist derived findings and report records into SQLite.""" parsed = self._parse_results(plugin, output) findings_data = parsed.get("findings", []) - + # Update task with structured results await db.execute( "UPDATE tasks SET structured_json = ? WHERE id = ?", @@ -692,7 +692,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]): """Persist modular scanner results into findings, and reports.""" findings_data = result.get("findings", []) - + # Insert findings for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") @@ -748,12 +748,12 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: """Route to appropriate parser based on plugin metadata.""" parser_type = plugin.output.get("parser") parser_input = self._resolve_parser_input(plugin, output) - + # 1. Check for custom parser.py in plugin directory (Recommended) plugin_manager = get_plugin_manager() plugin_dir = plugin_manager.plugins_dir / plugin.id parser_path = plugin_dir / "parser.py" - + if parser_path.exists(): if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir): logger.error( @@ -783,7 +783,7 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: return self._normalize_parsed_result(plugin, parser_input, self._parse_nmap_output(parser_input)) elif parser_type == "builtin_http": return self._normalize_parsed_result(plugin, parser_input, self._parse_http_output(parser_input)) - + return self._normalize_parsed_result(plugin, parser_input, {"findings": [], "raw": parser_input}) def _resolve_parser_input(self, plugin, output: str) -> str: @@ -942,7 +942,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: findings = [] ports = [] services = [] - + # Regex for open ports: 80/tcp open http port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)") for match in port_pattern.finditer(output): @@ -958,7 +958,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: "remediation": "Close unnecessary ports and use a firewall to restrict access.", "metadata": {"port": port_str, "protocol": proto, "service": service} }) - + return { "open_ports": sorted(list(set(ports))), "services": sorted(list(set(services))), diff --git a/backend/secuscan/plugins.py b/backend/secuscan/plugins.py index af1ec139..45285a3d 100644 --- a/backend/secuscan/plugins.py +++ b/backend/secuscan/plugins.py @@ -172,7 +172,7 @@ def compute_plugin_digest(metadata_file: Path, parser_file: Path) -> str: parser_digest = hashlib.sha256(parser_bytes_normalized).hexdigest() return hashlib.sha256(f"{metadata_digest}:{parser_digest}".encode("utf-8")).hexdigest() - + def verify_parser_at_exec_time( self, plugin: PluginMetadata, plugin_dir: Path ) -> bool: