-
Notifications
You must be signed in to change notification settings - Fork 92
fix(plugins): add argv-level injection validation to build_command #280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,9 @@ | ||
| import asyncio | ||
| import json | ||
| import re | ||
| from typing import Dict, Any, List | ||
| from typing import Dict, Any, List, Optional, Tuple | ||
| from .base import BaseScanner | ||
| from ..plugins import get_plugin_manager | ||
| from ..config import settings | ||
| from datetime import datetime | ||
|
|
||
|
|
||
| class PortScanner(BaseScanner): | ||
| """ | ||
|
|
@@ -21,50 +19,82 @@ def name(self) -> str: | |
| def category(self) -> str: | ||
| return "Network Security" | ||
|
|
||
| async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: | ||
| # ------------------------------------------------------------------ | ||
| # Input normalisation helpers | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| @staticmethod | ||
| def _resolve_scan_type(raw: Any) -> str: | ||
| """Map caller-supplied scan_type to the nmap plugin's SELECT value. | ||
|
|
||
| The plugin field 'scan_type' accepts only "S" | "T" | "U". | ||
| Callers may pass the raw letter, "-sX", or "sX" forms. | ||
| """ | ||
| Runs Nmap scan and parses output into structured findings. | ||
| _VALID = {"S", "T", "U"} | ||
| if not raw: | ||
| return "T" | ||
| value = str(raw).strip().upper() | ||
| # Already a bare valid letter | ||
| if value in _VALID: | ||
| return value | ||
| # Strip a leading "-S" or "S" prefix (e.g. "-sT" → "T", "sS" → "S") | ||
| stripped = re.sub(r"^-?S", "", value) | ||
| letter = stripped[0] if stripped else "" | ||
| return letter if letter in _VALID else "T" | ||
|
|
||
| @staticmethod | ||
| def _resolve_ports(raw: Any) -> str: | ||
| """Map shorthand port specs to a clean numeric range string accepted by the plugin. | ||
|
|
||
| Returns: | ||
| Empty string → use plugin default (top-100 via command template) | ||
| Numeric range → passed through as-is | ||
| """ | ||
| if not raw or raw in ("", "top100"): | ||
| return "" | ||
| if raw == "top1000": | ||
| return "1-1000" | ||
| if raw == "all": | ||
| return "1-65535" | ||
| # If it looks like a numeric spec (digits, commas, hyphens), pass through | ||
| if re.match(r"^[\d,\-]+$", str(raw)): | ||
| return str(raw) | ||
| return "" | ||
|
|
||
| async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]: | ||
| """Runs Nmap scan and parses output into structured findings.""" | ||
| self.update_progress(0.1) | ||
|
|
||
| # Prepare inputs for the Nmap plugin | ||
| # Map PortScanner inputs to Nmap plugin fields | ||
|
|
||
| plugin_inputs = { | ||
| "target": target, | ||
| "scan_type": inputs.get("scan_type", "-sV"), | ||
| "ports": inputs.get("ports", "top100"), | ||
| "speed": inputs.get("speed", "T4"), | ||
| "safe_mode": inputs.get("safe_mode", True) | ||
| "scan_type": self._resolve_scan_type(inputs.get("scan_type", "T")), | ||
| "ports": self._resolve_ports(inputs.get("ports", "")), | ||
| "service_detection": bool(inputs.get("service_detection", True)), | ||
| "os_detection": bool(inputs.get("os_detection", False)), | ||
| "safe_mode": bool(inputs.get("safe_mode", True)), | ||
|
Comment on lines
+72
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Using Useful? React with 👍 / 👎. |
||
| } | ||
|
|
||
| # Handle port shortcuts | ||
| if plugin_inputs["ports"] == "top100": | ||
| plugin_inputs["ports"] = "--top-ports 100" | ||
| elif plugin_inputs["ports"] == "top1000": | ||
| plugin_inputs["ports"] = "--top-ports 1000" | ||
| elif plugin_inputs["ports"] == "all": | ||
| plugin_inputs["ports"] = "-p-" | ||
|
|
||
| plugin_manager = get_plugin_manager() | ||
| command = plugin_manager.build_command("nmap", plugin_inputs) | ||
|
|
||
| if not command: | ||
| raise ValueError("Failed to build nmap command") | ||
|
|
||
| # Execute | ||
| self.update_progress(0.2) | ||
| output, exit_code = await self._execute_command(command) | ||
| self.update_progress(0.8) | ||
|
|
||
| # Parse | ||
|
|
||
| findings = self._parse_nmap_output(output, target) | ||
|
|
||
| self.update_progress(1.0) | ||
| return { | ||
| "findings": findings, | ||
| "summary": [f"Scanned {target} for open ports.", f"Discovered {len(findings)} open ports."], | ||
| "summary": [ | ||
| f"Scanned {target} for open ports.", | ||
| f"Discovered {len(findings)} open ports.", | ||
| ], | ||
| "open_ports": [f["metadata"]["port"] for f in findings], | ||
| "status": "completed" if exit_code == 0 else "failed" | ||
| "status": "completed" if exit_code == 0 else "failed", | ||
| } | ||
|
|
||
| async def _execute_command(self, command: List[str]) -> tuple: | ||
|
|
@@ -73,11 +103,11 @@ async def _execute_command(self, command: List[str]) -> tuple: | |
| process = await asyncio.create_subprocess_exec( | ||
| *command, | ||
| stdout=asyncio.subprocess.PIPE, | ||
| stderr=asyncio.subprocess.STDOUT | ||
| stderr=asyncio.subprocess.STDOUT, | ||
| ) | ||
| try: | ||
| stdout, _ = await process.communicate() | ||
| return stdout.decode('utf-8', errors='replace'), process.returncode | ||
| return stdout.decode("utf-8", errors="replace"), process.returncode | ||
| except asyncio.CancelledError: | ||
| try: | ||
| process.kill() | ||
|
|
@@ -88,30 +118,31 @@ async def _execute_command(self, command: List[str]) -> tuple: | |
|
|
||
| def _parse_nmap_output(self, output: str, target: str) -> List[Dict[str, Any]]: | ||
| findings = [] | ||
| # Regex for open ports: 80/tcp open http | ||
| port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)\s*(.*)") | ||
|
|
||
| for match in port_pattern.finditer(output): | ||
| port_str, proto, service, version = match.groups() | ||
|
|
||
| title = f"Open Port: {port_str}/{proto} ({service})" | ||
| description = f"Port {port_str} is open and running {service} service." | ||
| if version.strip(): | ||
| description += f" Version detected: {version.strip()}" | ||
|
|
||
| findings.append({ | ||
| "title": title, | ||
| "category": "Network Service", | ||
| "severity": self.normalize_severity("low"), | ||
| "target": target, | ||
| "description": description, | ||
| "remediation": "Close unnecessary ports and use a firewall to restrict access.", | ||
| "metadata": { | ||
| "port": port_str, | ||
| "protocol": proto, | ||
| "service": service, | ||
| "version": version.strip() | ||
|
|
||
| findings.append( | ||
| { | ||
| "title": title, | ||
| "category": "Network Service", | ||
| "severity": self.normalize_severity("low"), | ||
| "target": target, | ||
| "description": description, | ||
| "remediation": "Close unnecessary ports and use a firewall to restrict access.", | ||
| "metadata": { | ||
| "port": port_str, | ||
| "protocol": proto, | ||
| "service": service, | ||
| "version": version.strip(), | ||
| }, | ||
| } | ||
| }) | ||
| ) | ||
|
|
||
| return findings | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mapping
"top1000"to"1-1000"changes scan behavior from “top 1000 most common ports” to “first 1000 numeric ports,” which can miss frequently exposed services above 1000 and produce materially different findings for existing callers that use this shortcut. This should preserve the original top-ports behavior (e.g., via preset/flag handling) rather than narrowing the scan range.Useful? React with 👍 / 👎.