diff --git a/backend/secuscan/database.py b/backend/secuscan/database.py index 8ff8775e..1d1eec59 100644 --- a/backend/secuscan/database.py +++ b/backend/secuscan/database.py @@ -215,6 +215,21 @@ async def _create_schema(self): except Exception as e: print(f"Failed to add 'proof' to findings: {e}") + risk_cols = { + "exploitability": "REAL", + "confidence": "REAL", + "asset_exposure": "TEXT", + "risk_score": "REAL", + "risk_factors_json": "TEXT NOT NULL DEFAULT '[]'", + } + for col_name, col_type in risk_cols.items(): + if col_name not in existing_finding_cols: + try: + await self.execute(f"ALTER TABLE findings ADD COLUMN {col_name} {col_type}") + print(f"Added missing column {col_name} to findings table.") + except Exception as e: + print(f"Failed to add column {col_name}: {e}") + async def execute(self, query: str, params: tuple = ()): """Execute a write query.""" await self.connection.execute(query, params) diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 3b45fbbe..53820896 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -21,6 +21,7 @@ from .models import TaskStatus from .ratelimit import concurrent_limiter from .ratelimit import concurrent_limiter +from .risk_scoring import compute_risk_score, compute_risk_factors # Modular Scanners from .scanners.port_scanner import PortScanner @@ -643,13 +644,36 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") finding_id = f"finding:{task_id}:{u_id[:8]}" + + exploitability = finding.get("exploitability") + confidence = finding.get("confidence") + asset_exposure = finding.get("asset_exposure") + risk_score = compute_risk_score( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + ) + risk_factors = compute_risk_factors( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + risk_score=risk_score, + ) + await db.execute( """ INSERT INTO findings ( id, task_id, plugin_id, title, category, severity, target, description, remediation, proof, cvss, cve, - metadata_json, discovered_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now'))) + metadata_json, discovered_at, + exploitability, confidence, asset_exposure, + risk_score, risk_factors_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now')), + ?, ?, ?, ?, ?) """, ( finding_id, @@ -665,6 +689,11 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: finding.get("cvss"), finding.get("cve"), json.dumps(finding.get("metadata", {})), + exploitability, + confidence, + asset_exposure, + risk_score, + json.dumps(risk_factors), ), ) @@ -697,13 +726,36 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") finding_id = f"finding:{task_id}:{u_id[:8]}" + + exploitability = finding.get("exploitability") + confidence = finding.get("confidence") + asset_exposure = finding.get("asset_exposure") + risk_score = compute_risk_score( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + ) + risk_factors = compute_risk_factors( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + risk_score=risk_score, + ) + await db.execute( """ INSERT INTO findings ( id, task_id, plugin_id, title, category, severity, target, description, remediation, proof, cvss, cve, - metadata_json, discovered_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now'))) + metadata_json, discovered_at, + exploitability, confidence, asset_exposure, + risk_score, risk_factors_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now')), + ?, ?, ?, ?, ?) """, ( finding_id, @@ -719,6 +771,11 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann finding.get("cvss"), finding.get("cve"), json.dumps(finding.get("metadata", {})), + exploitability, + confidence, + asset_exposure, + risk_score, + json.dumps(risk_factors), ) ) diff --git a/backend/secuscan/models.py b/backend/secuscan/models.py index 264363e5..b5cf74fa 100644 --- a/backend/secuscan/models.py +++ b/backend/secuscan/models.py @@ -115,6 +115,11 @@ class Finding(BaseModel): proof: Optional[str] = None discovered_at: Optional[datetime] = None metadata: Dict[str, Any] = Field(default_factory=dict) + exploitability: Optional[float] = None + confidence: Optional[float] = None + asset_exposure: Optional[str] = None + risk_score: Optional[float] = None + risk_factors: List[Dict[str, Any]] = Field(default_factory=list) class TaskResult(BaseModel): diff --git a/backend/secuscan/risk_scoring.py b/backend/secuscan/risk_scoring.py new file mode 100644 index 00000000..4166a1a9 --- /dev/null +++ b/backend/secuscan/risk_scoring.py @@ -0,0 +1,222 @@ +""" +Risk scoring model with explainable finding prioritization. + +Computes a composite risk score (0–10) from five factors: + - severity (30%) + - exploitability (25%) + - asset exposure (20%) + - recency (15%) + - confidence (10%) + +Each factor also produces a human-readable explanation entry. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +# --------------------------------------------------------------------------- +# Numeric maps +# --------------------------------------------------------------------------- + +SEVERITY_MAP: Dict[str, float] = { + "critical": 10.0, + "high": 7.5, + "medium": 5.0, + "low": 2.5, + "info": 0.5, +} + +ASSET_EXPOSURE_MAP: Dict[str, float] = { + "critical": 10.0, + "high": 7.5, + "medium": 5.0, + "low": 2.5, +} + +# Weights used in the composite score (must sum to 1.0) +WEIGHTS = { + "severity": 0.30, + "exploitability": 0.25, + "asset_exposure": 0.20, + "recency": 0.15, + "confidence": 0.10, +} + + +def _severity_score(severity: str) -> float: + """Map severity label to a numeric 0–10 value.""" + return SEVERITY_MAP.get(severity.lower(), 0.5) + + +def _recency_score(discovered_at: Optional[datetime]) -> float: + """Score recency (10 = today, down to 0 for very old).""" + if discovered_at is None: + return 5.0 + now = datetime.now(timezone.utc) + if discovered_at.tzinfo is None: + from datetime import timedelta + discovered = discovered_at.replace(tzinfo=timezone.utc) + else: + discovered = discovered_at + days = (now - discovered).days + if days < 7: + return 10.0 + if days < 30: + return 7.5 + if days < 90: + return 5.0 + if days < 365: + return 2.5 + return 1.0 + + +def _confidence_score(confidence: Optional[float]) -> float: + """Map confidence 0–1 to 0–10. Default 0.5 → 5.0.""" + if confidence is None: + return 5.0 + return max(0.0, min(10.0, confidence * 10.0)) + + +def _clamp(value: float, lo: float = 0.0, hi: float = 10.0) -> float: + return max(lo, min(hi, value)) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def compute_risk_score( + severity: str, + exploitability: Optional[float] = None, + asset_exposure: Optional[str] = None, + discovered_at: Optional[datetime] = None, + confidence: Optional[float] = None, +) -> float: + """ + Compute a weighted composite risk score in [0, 10]. + + Parameters + ---------- + severity : str + One of "critical", "high", "medium", "low", "info". + exploitability : float or None + 0–10. Defaults to 5.0 when None. + asset_exposure : str or None + One of "critical", "high", "medium", "low". Defaults to "medium". + discovered_at : datetime or None + When the finding was discovered. Defaults to 90-day-old equivalent. + confidence : float or None + 0–1. Defaults to 0.5 when None. + """ + sv = _severity_score(severity) + ev = _clamp(exploitability if exploitability is not None else 5.0) + av = ASSET_EXPOSURE_MAP.get(asset_exposure.lower() if asset_exposure else None, 5.0) + rv = _recency_score(discovered_at) + cv = _confidence_score(confidence) + + score = ( + sv * WEIGHTS["severity"] + + ev * WEIGHTS["exploitability"] + + av * WEIGHTS["asset_exposure"] + + rv * WEIGHTS["recency"] + + cv * WEIGHTS["confidence"] + ) + return round(_clamp(score), 1) + + +def compute_risk_factors( + severity: str, + exploitability: Optional[float] = None, + asset_exposure: Optional[str] = None, + discovered_at: Optional[datetime] = None, + confidence: Optional[float] = None, + risk_score: Optional[float] = None, +) -> List[Dict[str, Any]]: + """ + Return a list of explainable factor dicts, each with: + - factor: short key name + - label: human-readable label + - value: raw value + - score: numeric sub-score (0–10) + - weight: contribution weight + - contribution: weighted contribution to total + - detail: short explanation sentence + """ + if risk_score is None: + risk_score = compute_risk_score(severity, exploitability, asset_exposure, discovered_at, confidence) + + sv = _severity_score(severity) + ev = _clamp(exploitability if exploitability is not None else 5.0) + av = ASSET_EXPOSURE_MAP.get(asset_exposure.lower() if asset_exposure else None, 5.0) + rv = _recency_score(discovered_at) + cv = _confidence_score(confidence) + + factors = [ + { + "factor": "severity", + "label": "Severity", + "value": severity, + "score": round(sv, 1), + "weight": WEIGHTS["severity"], + "contribution": round(sv * WEIGHTS["severity"], 2), + "detail": f"Severity is {severity} ({sv:.1f}/10)", + }, + { + "factor": "exploitability", + "label": "Exploitability", + "value": exploitability if exploitability is not None else 5.0, + "score": round(ev, 1), + "weight": WEIGHTS["exploitability"], + "contribution": round(ev * WEIGHTS["exploitability"], 2), + "detail": f"Exploitability score is {ev:.1f}/10", + }, + { + "factor": "asset_exposure", + "label": "Asset Exposure", + "value": asset_exposure or "medium", + "score": round(av, 1), + "weight": WEIGHTS["asset_exposure"], + "contribution": round(av * WEIGHTS["asset_exposure"], 2), + "detail": f"Asset exposure is {asset_exposure or 'medium'} ({av:.1f}/10)", + }, + { + "factor": "recency", + "label": "Recency", + "value": f"{discovered_at.isoformat() if discovered_at else 'unknown'}", + "score": round(rv, 1), + "weight": WEIGHTS["recency"], + "contribution": round(rv * WEIGHTS["recency"], 2), + "detail": _recency_detail(discovered_at, rv), + }, + { + "factor": "confidence", + "label": "Confidence", + "value": confidence if confidence is not None else 0.5, + "score": round(cv, 1), + "weight": WEIGHTS["confidence"], + "contribution": round(cv * WEIGHTS["confidence"], 2), + "detail": f"Confidence is {(confidence * 100 if confidence else 50):.0f}%", + }, + ] + return factors + + +def _recency_detail(discovered_at: Optional[datetime], rv: float) -> str: + if discovered_at is None: + return "No discovery date — assumed moderate recency" + from datetime import timezone + now = datetime.now(timezone.utc) + if discovered_at.tzinfo is None: + from datetime import timedelta + d = discovered_at.replace(tzinfo=timezone.utc) + else: + d = discovered_at + days = (now - d).days + if days < 0: + return "Discovered in the future — treated as very recent" + if days == 0: + return "Discovered today — maximum recency score" + if days == 1: + return f"Discovered {days} day ago — recency score {rv:.1f}/10" + return f"Discovered {days} days ago — recency score {rv:.1f}/10" diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index f1d53063..a531142e 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -618,6 +618,12 @@ async def build(): recent_findings: List[Dict] = findings[:5] + risk_scores = [ + f.get("risk_score") for f in findings + if isinstance(f.get("risk_score"), (int, float)) + ] + avg_risk_score = round(sum(risk_scores) / len(risk_scores), 1) if risk_scores else None + return { "total_findings": len(findings), "critical_findings": critical_findings, @@ -625,6 +631,7 @@ async def build(): "medium_findings": medium_findings, "low_findings": low_findings, "info_findings": info_findings, + "avg_risk_score": avg_risk_score, "last_scan_time": findings[0].get("discovered_at") if findings else None, "recent_findings": recent_findings, "scan_activity": { @@ -656,7 +663,11 @@ async def get_findings(): async def build(): db = await get_db() rows = await db.fetchall("SELECT * FROM findings ORDER BY discovered_at DESC") - return {"findings": parse_json_fields(rows, ["metadata_json"])} + findings = parse_json_fields(rows, ["metadata_json", "risk_factors_json"]) + for f in findings: + if "risk_factors_json" in f: + f["risk_factors"] = f.pop("risk_factors_json") + return {"findings": findings} return await get_or_set_cached("findings:list", build) @@ -1056,6 +1067,13 @@ async def get_finding_details(finding_id: str): except json.JSONDecodeError: metadata = {} + risk_factors = [] + if finding_row.get("risk_factors_json"): + try: + risk_factors = json.loads(finding_row["risk_factors_json"]) + except (json.JSONDecodeError, TypeError): + risk_factors = [] + return { "id": finding_row["id"], "task_id": finding_row["task_id"], @@ -1071,7 +1089,12 @@ async def get_finding_details(finding_id: str): "cvss": finding_row["cvss"], "cve": finding_row["cve"], "discovered_at": finding_row["discovered_at"], - "metadata": metadata + "metadata": metadata, + "exploitability": finding_row.get("exploitability"), + "confidence": finding_row.get("confidence"), + "asset_exposure": finding_row.get("asset_exposure"), + "risk_score": finding_row.get("risk_score"), + "risk_factors": risk_factors, } diff --git a/frontend/src/pages/Findings.tsx b/frontend/src/pages/Findings.tsx index cfc78824..fb9f849b 100644 --- a/frontend/src/pages/Findings.tsx +++ b/frontend/src/pages/Findings.tsx @@ -2,6 +2,16 @@ import React, { useEffect, useMemo, useState } from 'react' import { motion } from 'framer-motion' import { getFindings } from '../api' import { formatLocaleDate, parseDateSafe, getCurrentTimeZone } from '../utils/date' +type RiskFactor = { + factor: string + label: string + value: string | number + score: number + weight: number + contribution: number + detail: string +} + type Finding = { id: string severity: string @@ -14,6 +24,11 @@ type Finding = { cvss?: number cve?: string plugin_id?: string + risk_score?: number + risk_factors?: RiskFactor[] + exploitability?: number + confidence?: number + asset_exposure?: string } type FindingStatus = 'new' | 'reviewed' | 'suppressed' @@ -667,12 +682,47 @@ export default function Findings() {
Severity Score
+CVSS
{typeof selectedFinding.cvss === 'number' ? selectedFinding.cvss.toFixed(1) : 'N/A'}
Risk Score
+= 7 ? 'text-rag-red' : + selectedFinding.risk_score >= 4 ? 'text-rag-amber' : 'text-rag-blue' + }`}> + {selectedFinding.risk_score.toFixed(1)} +
+= 7 ? 'text-rag-red' : finding.risk_score >= 4 ? 'text-rag-amber' : 'text-rag-blue'}`}> + {finding.risk_score.toFixed(1)} +
+