Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion backend/secuscan/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import hashlib
import hmac

from .models import PluginMetadata
from .models import PluginMetadata, PluginFieldType
from .config import settings

# Port specifications: digits, commas, and hyphens only (e.g. "22,80,443" or "1-1000")
_PORT_SPEC_PATTERN = re.compile(r"^[\d,\-]+$")

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -317,6 +320,83 @@ def _normalize_inputs(self, plugin: PluginMetadata, inputs: Dict[str, Any]) -> D
normalized["wordlist"] = self._resolve_wordlist_path(wordlist_value.strip())
return normalized

def _reject_injected_args(self, field_id: str, value: str) -> None:
"""Raise ValueError if value looks like a flag injection attempt.

Port fields are exempt from the leading-dash check but must match the
numeric port-specification grammar. All other string fields must not
begin with a '-' character.
"""
if field_id in ("ports", "port"):
if value and not _PORT_SPEC_PATTERN.match(value):
raise ValueError(
f"Invalid port specification {value!r}: "
"only digits, commas, and hyphens are permitted"
)
return
if value.lstrip().startswith("-"):
raise ValueError(
f"Field '{field_id}' value must not begin with '-': {value!r}"
)

def _validate_inputs_against_schema(
self, plugin: PluginMetadata, inputs: Dict[str, Any]
) -> None:
"""Validate caller-supplied inputs against the plugin's declared field schema.

Raises ValueError with a descriptive message for the first violation found.
"""
field_map = {f.id: f for f in plugin.fields}

for field_id, raw_value in inputs.items():
field = field_map.get(field_id)
if field is None:
continue

# Skip None / empty values — defaults will be applied later by _with_field_defaults
if raw_value is None or raw_value == "":
continue

if field.type == PluginFieldType.INTEGER:
try:
int(raw_value)
except (TypeError, ValueError):
raise ValueError(
f"Field '{field_id}' expects an integer; got {raw_value!r}"
)
continue

if field.type == PluginFieldType.BOOLEAN:
if isinstance(raw_value, bool):
continue
if isinstance(raw_value, str) and raw_value.lower() in ("true", "false", "1", "0"):
continue
raise ValueError(
f"Field '{field_id}' expects a boolean; got {raw_value!r}"
)

if field.type == PluginFieldType.SELECT:
allowed = [opt.get("value") for opt in (field.options or [])]
if raw_value not in allowed:
raise ValueError(
f"Field '{field_id}' value {raw_value!r} is not in allowed "
f"values {allowed}"
)
continue

if field.type in (PluginFieldType.STRING, PluginFieldType.TEXT):
value_str = str(raw_value)

# Pattern validation from field metadata
validation = field.validation or {}
pattern = validation.get("pattern")
if pattern and not re.match(pattern, value_str):
msg = validation.get("message", f"Value does not match pattern {pattern!r}")
raise ValueError(f"Field '{field_id}': {msg}")

# Reject argv-level flag injection
self._reject_injected_args(field_id, value_str)

def build_command(self, plugin_id: str, inputs: Dict) -> Optional[List[str]]:
"""
Build command from plugin template and user inputs.
Expand All @@ -332,6 +412,8 @@ def build_command(self, plugin_id: str, inputs: Dict) -> Optional[List[str]]:
if not plugin:
return None

# Validate before normalisation so SELECT checks run against raw user values
self._validate_inputs_against_schema(plugin, inputs)
inputs = self._normalize_inputs(plugin, inputs)
command = []

Expand Down
127 changes: 79 additions & 48 deletions backend/secuscan/scanners/port_scanner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
import json
import re
from typing import Dict, Any, List
from typing import Dict, Any, List, Optional, Tuple
from .base import BaseScanner
from ..plugins import get_plugin_manager
from ..config import settings
from datetime import datetime


class PortScanner(BaseScanner):
"""
Expand All @@ -21,50 +19,82 @@ def name(self) -> str:
def category(self) -> str:
return "Network Security"

async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
# ------------------------------------------------------------------
# Input normalisation helpers
# ------------------------------------------------------------------

@staticmethod
def _resolve_scan_type(raw: Any) -> str:
"""Map caller-supplied scan_type to the nmap plugin's SELECT value.

The plugin field 'scan_type' accepts only "S" | "T" | "U".
Callers may pass the raw letter, "-sX", or "sX" forms.
"""
Runs Nmap scan and parses output into structured findings.
_VALID = {"S", "T", "U"}
if not raw:
return "T"
value = str(raw).strip().upper()
# Already a bare valid letter
if value in _VALID:
return value
# Strip a leading "-S" or "S" prefix (e.g. "-sT" → "T", "sS" → "S")
stripped = re.sub(r"^-?S", "", value)
letter = stripped[0] if stripped else ""
return letter if letter in _VALID else "T"

@staticmethod
def _resolve_ports(raw: Any) -> str:
"""Map shorthand port specs to a clean numeric range string accepted by the plugin.

Returns:
Empty string → use plugin default (top-100 via command template)
Numeric range → passed through as-is
"""
if not raw or raw in ("", "top100"):
return ""
if raw == "top1000":
return "1-1000"
Comment on lines +55 to +56
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve top1000 semantics when normalizing ports

Mapping "top1000" to "1-1000" changes scan behavior from “top 1000 most common ports” to “first 1000 numeric ports,” which can miss frequently exposed services above 1000 and produce materially different findings for existing callers that use this shortcut. This should preserve the original top-ports behavior (e.g., via preset/flag handling) rather than narrowing the scan range.

Useful? React with 👍 / 👎.

if raw == "all":
return "1-65535"
# If it looks like a numeric spec (digits, commas, hyphens), pass through
if re.match(r"^[\d,\-]+$", str(raw)):
return str(raw)
return ""

async def run(self, target: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Runs Nmap scan and parses output into structured findings."""
self.update_progress(0.1)

# Prepare inputs for the Nmap plugin
# Map PortScanner inputs to Nmap plugin fields

plugin_inputs = {
"target": target,
"scan_type": inputs.get("scan_type", "-sV"),
"ports": inputs.get("ports", "top100"),
"speed": inputs.get("speed", "T4"),
"safe_mode": inputs.get("safe_mode", True)
"scan_type": self._resolve_scan_type(inputs.get("scan_type", "T")),
"ports": self._resolve_ports(inputs.get("ports", "")),
"service_detection": bool(inputs.get("service_detection", True)),
"os_detection": bool(inputs.get("os_detection", False)),
"safe_mode": bool(inputs.get("safe_mode", True)),
Comment on lines +72 to +74
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Parse boolean strings explicitly in scanner inputs

Using bool(...) here inverts common string inputs because any non-empty string is truthy, so values like "false" will still enable service_detection, os_detection, and safe_mode. Since task inputs are passed through as arbitrary JSON, clients that send booleans as strings will get incorrect scan flags and behavior. These fields need explicit boolean parsing (e.g., recognize "true"/"false", "1"/"0") instead of Python truthiness coercion.

Useful? React with 👍 / 👎.

}

# Handle port shortcuts
if plugin_inputs["ports"] == "top100":
plugin_inputs["ports"] = "--top-ports 100"
elif plugin_inputs["ports"] == "top1000":
plugin_inputs["ports"] = "--top-ports 1000"
elif plugin_inputs["ports"] == "all":
plugin_inputs["ports"] = "-p-"

plugin_manager = get_plugin_manager()
command = plugin_manager.build_command("nmap", plugin_inputs)

if not command:
raise ValueError("Failed to build nmap command")

# Execute
self.update_progress(0.2)
output, exit_code = await self._execute_command(command)
self.update_progress(0.8)

# Parse

findings = self._parse_nmap_output(output, target)

self.update_progress(1.0)
return {
"findings": findings,
"summary": [f"Scanned {target} for open ports.", f"Discovered {len(findings)} open ports."],
"summary": [
f"Scanned {target} for open ports.",
f"Discovered {len(findings)} open ports.",
],
"open_ports": [f["metadata"]["port"] for f in findings],
"status": "completed" if exit_code == 0 else "failed"
"status": "completed" if exit_code == 0 else "failed",
}

async def _execute_command(self, command: List[str]) -> tuple:
Expand All @@ -73,11 +103,11 @@ async def _execute_command(self, command: List[str]) -> tuple:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT
stderr=asyncio.subprocess.STDOUT,
)
try:
stdout, _ = await process.communicate()
return stdout.decode('utf-8', errors='replace'), process.returncode
return stdout.decode("utf-8", errors="replace"), process.returncode
except asyncio.CancelledError:
try:
process.kill()
Expand All @@ -88,30 +118,31 @@ async def _execute_command(self, command: List[str]) -> tuple:

def _parse_nmap_output(self, output: str, target: str) -> List[Dict[str, Any]]:
findings = []
# Regex for open ports: 80/tcp open http
port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)\s*(.*)")

for match in port_pattern.finditer(output):
port_str, proto, service, version = match.groups()

title = f"Open Port: {port_str}/{proto} ({service})"
description = f"Port {port_str} is open and running {service} service."
if version.strip():
description += f" Version detected: {version.strip()}"

findings.append({
"title": title,
"category": "Network Service",
"severity": self.normalize_severity("low"),
"target": target,
"description": description,
"remediation": "Close unnecessary ports and use a firewall to restrict access.",
"metadata": {
"port": port_str,
"protocol": proto,
"service": service,
"version": version.strip()

findings.append(
{
"title": title,
"category": "Network Service",
"severity": self.normalize_severity("low"),
"target": target,
"description": description,
"remediation": "Close unnecessary ports and use a firewall to restrict access.",
"metadata": {
"port": port_str,
"protocol": proto,
"service": service,
"version": version.strip(),
},
}
})
)

return findings
Loading
Loading