diff --git a/backend/secuscan/database.py b/backend/secuscan/database.py index 8ff8775e..1d1eec59 100644 --- a/backend/secuscan/database.py +++ b/backend/secuscan/database.py @@ -215,6 +215,21 @@ async def _create_schema(self): except Exception as e: print(f"Failed to add 'proof' to findings: {e}") + risk_cols = { + "exploitability": "REAL", + "confidence": "REAL", + "asset_exposure": "TEXT", + "risk_score": "REAL", + "risk_factors_json": "TEXT NOT NULL DEFAULT '[]'", + } + for col_name, col_type in risk_cols.items(): + if col_name not in existing_finding_cols: + try: + await self.execute(f"ALTER TABLE findings ADD COLUMN {col_name} {col_type}") + print(f"Added missing column {col_name} to findings table.") + except Exception as e: + print(f"Failed to add column {col_name}: {e}") + async def execute(self, query: str, params: tuple = ()): """Execute a write query.""" await self.connection.execute(query, params) diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 3b45fbbe..53820896 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -21,6 +21,7 @@ from .models import TaskStatus from .ratelimit import concurrent_limiter from .ratelimit import concurrent_limiter +from .risk_scoring import compute_risk_score, compute_risk_factors # Modular Scanners from .scanners.port_scanner import PortScanner @@ -643,13 +644,36 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") finding_id = f"finding:{task_id}:{u_id[:8]}" + + exploitability = finding.get("exploitability") + confidence = finding.get("confidence") + asset_exposure = finding.get("asset_exposure") + risk_score = compute_risk_score( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + ) + risk_factors = compute_risk_factors( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + risk_score=risk_score, + ) + await db.execute( """ INSERT INTO findings ( id, task_id, plugin_id, title, category, severity, target, description, remediation, proof, cvss, cve, - metadata_json, discovered_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now'))) + metadata_json, discovered_at, + exploitability, confidence, asset_exposure, + risk_score, risk_factors_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now')), + ?, ?, ?, ?, ?) """, ( finding_id, @@ -665,6 +689,11 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: finding.get("cvss"), finding.get("cve"), json.dumps(finding.get("metadata", {})), + exploitability, + confidence, + asset_exposure, + risk_score, + json.dumps(risk_factors), ), ) @@ -697,13 +726,36 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") finding_id = f"finding:{task_id}:{u_id[:8]}" + + exploitability = finding.get("exploitability") + confidence = finding.get("confidence") + asset_exposure = finding.get("asset_exposure") + risk_score = compute_risk_score( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + ) + risk_factors = compute_risk_factors( + severity=finding["severity"], + exploitability=exploitability, + asset_exposure=asset_exposure, + discovered_at=None, + confidence=confidence, + risk_score=risk_score, + ) + await db.execute( """ INSERT INTO findings ( id, task_id, plugin_id, title, category, severity, target, description, remediation, proof, cvss, cve, - metadata_json, discovered_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now'))) + metadata_json, discovered_at, + exploitability, confidence, asset_exposure, + risk_score, risk_factors_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (datetime('now')), + ?, ?, ?, ?, ?) """, ( finding_id, @@ -719,6 +771,11 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann finding.get("cvss"), finding.get("cve"), json.dumps(finding.get("metadata", {})), + exploitability, + confidence, + asset_exposure, + risk_score, + json.dumps(risk_factors), ) ) diff --git a/backend/secuscan/models.py b/backend/secuscan/models.py index 264363e5..b5cf74fa 100644 --- a/backend/secuscan/models.py +++ b/backend/secuscan/models.py @@ -115,6 +115,11 @@ class Finding(BaseModel): proof: Optional[str] = None discovered_at: Optional[datetime] = None metadata: Dict[str, Any] = Field(default_factory=dict) + exploitability: Optional[float] = None + confidence: Optional[float] = None + asset_exposure: Optional[str] = None + risk_score: Optional[float] = None + risk_factors: List[Dict[str, Any]] = Field(default_factory=list) class TaskResult(BaseModel): diff --git a/backend/secuscan/risk_scoring.py b/backend/secuscan/risk_scoring.py new file mode 100644 index 00000000..4166a1a9 --- /dev/null +++ b/backend/secuscan/risk_scoring.py @@ -0,0 +1,222 @@ +""" +Risk scoring model with explainable finding prioritization. + +Computes a composite risk score (0–10) from five factors: + - severity (30%) + - exploitability (25%) + - asset exposure (20%) + - recency (15%) + - confidence (10%) + +Each factor also produces a human-readable explanation entry. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +# --------------------------------------------------------------------------- +# Numeric maps +# --------------------------------------------------------------------------- + +SEVERITY_MAP: Dict[str, float] = { + "critical": 10.0, + "high": 7.5, + "medium": 5.0, + "low": 2.5, + "info": 0.5, +} + +ASSET_EXPOSURE_MAP: Dict[str, float] = { + "critical": 10.0, + "high": 7.5, + "medium": 5.0, + "low": 2.5, +} + +# Weights used in the composite score (must sum to 1.0) +WEIGHTS = { + "severity": 0.30, + "exploitability": 0.25, + "asset_exposure": 0.20, + "recency": 0.15, + "confidence": 0.10, +} + + +def _severity_score(severity: str) -> float: + """Map severity label to a numeric 0–10 value.""" + return SEVERITY_MAP.get(severity.lower(), 0.5) + + +def _recency_score(discovered_at: Optional[datetime]) -> float: + """Score recency (10 = today, down to 0 for very old).""" + if discovered_at is None: + return 5.0 + now = datetime.now(timezone.utc) + if discovered_at.tzinfo is None: + from datetime import timedelta + discovered = discovered_at.replace(tzinfo=timezone.utc) + else: + discovered = discovered_at + days = (now - discovered).days + if days < 7: + return 10.0 + if days < 30: + return 7.5 + if days < 90: + return 5.0 + if days < 365: + return 2.5 + return 1.0 + + +def _confidence_score(confidence: Optional[float]) -> float: + """Map confidence 0–1 to 0–10. Default 0.5 → 5.0.""" + if confidence is None: + return 5.0 + return max(0.0, min(10.0, confidence * 10.0)) + + +def _clamp(value: float, lo: float = 0.0, hi: float = 10.0) -> float: + return max(lo, min(hi, value)) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def compute_risk_score( + severity: str, + exploitability: Optional[float] = None, + asset_exposure: Optional[str] = None, + discovered_at: Optional[datetime] = None, + confidence: Optional[float] = None, +) -> float: + """ + Compute a weighted composite risk score in [0, 10]. + + Parameters + ---------- + severity : str + One of "critical", "high", "medium", "low", "info". + exploitability : float or None + 0–10. Defaults to 5.0 when None. + asset_exposure : str or None + One of "critical", "high", "medium", "low". Defaults to "medium". + discovered_at : datetime or None + When the finding was discovered. Defaults to 90-day-old equivalent. + confidence : float or None + 0–1. Defaults to 0.5 when None. + """ + sv = _severity_score(severity) + ev = _clamp(exploitability if exploitability is not None else 5.0) + av = ASSET_EXPOSURE_MAP.get(asset_exposure.lower() if asset_exposure else None, 5.0) + rv = _recency_score(discovered_at) + cv = _confidence_score(confidence) + + score = ( + sv * WEIGHTS["severity"] + + ev * WEIGHTS["exploitability"] + + av * WEIGHTS["asset_exposure"] + + rv * WEIGHTS["recency"] + + cv * WEIGHTS["confidence"] + ) + return round(_clamp(score), 1) + + +def compute_risk_factors( + severity: str, + exploitability: Optional[float] = None, + asset_exposure: Optional[str] = None, + discovered_at: Optional[datetime] = None, + confidence: Optional[float] = None, + risk_score: Optional[float] = None, +) -> List[Dict[str, Any]]: + """ + Return a list of explainable factor dicts, each with: + - factor: short key name + - label: human-readable label + - value: raw value + - score: numeric sub-score (0–10) + - weight: contribution weight + - contribution: weighted contribution to total + - detail: short explanation sentence + """ + if risk_score is None: + risk_score = compute_risk_score(severity, exploitability, asset_exposure, discovered_at, confidence) + + sv = _severity_score(severity) + ev = _clamp(exploitability if exploitability is not None else 5.0) + av = ASSET_EXPOSURE_MAP.get(asset_exposure.lower() if asset_exposure else None, 5.0) + rv = _recency_score(discovered_at) + cv = _confidence_score(confidence) + + factors = [ + { + "factor": "severity", + "label": "Severity", + "value": severity, + "score": round(sv, 1), + "weight": WEIGHTS["severity"], + "contribution": round(sv * WEIGHTS["severity"], 2), + "detail": f"Severity is {severity} ({sv:.1f}/10)", + }, + { + "factor": "exploitability", + "label": "Exploitability", + "value": exploitability if exploitability is not None else 5.0, + "score": round(ev, 1), + "weight": WEIGHTS["exploitability"], + "contribution": round(ev * WEIGHTS["exploitability"], 2), + "detail": f"Exploitability score is {ev:.1f}/10", + }, + { + "factor": "asset_exposure", + "label": "Asset Exposure", + "value": asset_exposure or "medium", + "score": round(av, 1), + "weight": WEIGHTS["asset_exposure"], + "contribution": round(av * WEIGHTS["asset_exposure"], 2), + "detail": f"Asset exposure is {asset_exposure or 'medium'} ({av:.1f}/10)", + }, + { + "factor": "recency", + "label": "Recency", + "value": f"{discovered_at.isoformat() if discovered_at else 'unknown'}", + "score": round(rv, 1), + "weight": WEIGHTS["recency"], + "contribution": round(rv * WEIGHTS["recency"], 2), + "detail": _recency_detail(discovered_at, rv), + }, + { + "factor": "confidence", + "label": "Confidence", + "value": confidence if confidence is not None else 0.5, + "score": round(cv, 1), + "weight": WEIGHTS["confidence"], + "contribution": round(cv * WEIGHTS["confidence"], 2), + "detail": f"Confidence is {(confidence * 100 if confidence else 50):.0f}%", + }, + ] + return factors + + +def _recency_detail(discovered_at: Optional[datetime], rv: float) -> str: + if discovered_at is None: + return "No discovery date — assumed moderate recency" + from datetime import timezone + now = datetime.now(timezone.utc) + if discovered_at.tzinfo is None: + from datetime import timedelta + d = discovered_at.replace(tzinfo=timezone.utc) + else: + d = discovered_at + days = (now - d).days + if days < 0: + return "Discovered in the future — treated as very recent" + if days == 0: + return "Discovered today — maximum recency score" + if days == 1: + return f"Discovered {days} day ago — recency score {rv:.1f}/10" + return f"Discovered {days} days ago — recency score {rv:.1f}/10" diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index f1d53063..a531142e 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -618,6 +618,12 @@ async def build(): recent_findings: List[Dict] = findings[:5] + risk_scores = [ + f.get("risk_score") for f in findings + if isinstance(f.get("risk_score"), (int, float)) + ] + avg_risk_score = round(sum(risk_scores) / len(risk_scores), 1) if risk_scores else None + return { "total_findings": len(findings), "critical_findings": critical_findings, @@ -625,6 +631,7 @@ async def build(): "medium_findings": medium_findings, "low_findings": low_findings, "info_findings": info_findings, + "avg_risk_score": avg_risk_score, "last_scan_time": findings[0].get("discovered_at") if findings else None, "recent_findings": recent_findings, "scan_activity": { @@ -656,7 +663,11 @@ async def get_findings(): async def build(): db = await get_db() rows = await db.fetchall("SELECT * FROM findings ORDER BY discovered_at DESC") - return {"findings": parse_json_fields(rows, ["metadata_json"])} + findings = parse_json_fields(rows, ["metadata_json", "risk_factors_json"]) + for f in findings: + if "risk_factors_json" in f: + f["risk_factors"] = f.pop("risk_factors_json") + return {"findings": findings} return await get_or_set_cached("findings:list", build) @@ -1056,6 +1067,13 @@ async def get_finding_details(finding_id: str): except json.JSONDecodeError: metadata = {} + risk_factors = [] + if finding_row.get("risk_factors_json"): + try: + risk_factors = json.loads(finding_row["risk_factors_json"]) + except (json.JSONDecodeError, TypeError): + risk_factors = [] + return { "id": finding_row["id"], "task_id": finding_row["task_id"], @@ -1071,7 +1089,12 @@ async def get_finding_details(finding_id: str): "cvss": finding_row["cvss"], "cve": finding_row["cve"], "discovered_at": finding_row["discovered_at"], - "metadata": metadata + "metadata": metadata, + "exploitability": finding_row.get("exploitability"), + "confidence": finding_row.get("confidence"), + "asset_exposure": finding_row.get("asset_exposure"), + "risk_score": finding_row.get("risk_score"), + "risk_factors": risk_factors, } diff --git a/frontend/src/pages/Findings.tsx b/frontend/src/pages/Findings.tsx index cfc78824..fb9f849b 100644 --- a/frontend/src/pages/Findings.tsx +++ b/frontend/src/pages/Findings.tsx @@ -2,6 +2,16 @@ import React, { useEffect, useMemo, useState } from 'react' import { motion } from 'framer-motion' import { getFindings } from '../api' import { formatLocaleDate, parseDateSafe, getCurrentTimeZone } from '../utils/date' +type RiskFactor = { + factor: string + label: string + value: string | number + score: number + weight: number + contribution: number + detail: string +} + type Finding = { id: string severity: string @@ -14,6 +24,11 @@ type Finding = { cvss?: number cve?: string plugin_id?: string + risk_score?: number + risk_factors?: RiskFactor[] + exploitability?: number + confidence?: number + asset_exposure?: string } type FindingStatus = 'new' | 'reviewed' | 'suppressed' @@ -667,12 +682,47 @@ export default function Findings() {

-

Severity Score

+

CVSS

{typeof selectedFinding.cvss === 'number' ? selectedFinding.cvss.toFixed(1) : 'N/A'}

+ + {typeof selectedFinding.risk_score === 'number' && ( +
+
+

Risk Score

+

= 7 ? 'text-rag-red' : + selectedFinding.risk_score >= 4 ? 'text-rag-amber' : 'text-rag-blue' + }`}> + {selectedFinding.risk_score.toFixed(1)} +

+
+ {selectedFinding.risk_factors && selectedFinding.risk_factors.length > 0 && ( +
+ {selectedFinding.risk_factors.map((rf) => ( +
+
+ {rf.label} + ({(rf.weight * 100).toFixed(0)}%) +
+
+ {rf.score.toFixed(1)} + = 2 ? 'text-rag-red' : + rf.contribution >= 1 ? 'text-rag-amber' : 'text-silver/40' + }`}> + +{rf.contribution.toFixed(1)} + +
+
+ ))} +
+ )} +
+ )}
diff --git a/frontend/src/pages/TaskDetails.tsx b/frontend/src/pages/TaskDetails.tsx index 862c9c08..0b4e7631 100644 --- a/frontend/src/pages/TaskDetails.tsx +++ b/frontend/src/pages/TaskDetails.tsx @@ -46,6 +46,16 @@ interface Task { pending_count?: number } +interface RiskFactor { + factor: string + label: string + value: string | number + score: number + weight: number + contribution: number + detail: string +} + interface Finding { id?: string title: string @@ -59,6 +69,11 @@ interface Finding { proof?: string discovered_at?: string metadata?: Record + risk_score?: number + risk_factors?: RiskFactor[] + exploitability?: number + confidence?: number + asset_exposure?: string } interface TaskResult { @@ -229,6 +244,14 @@ export default function TaskDetails() {

)} + {finding.risk_score !== undefined && finding.risk_score !== null && ( +
+

Risk Score

+

= 7 ? 'text-rag-red' : finding.risk_score >= 4 ? 'text-rag-amber' : 'text-rag-blue'}`}> + {finding.risk_score.toFixed(1)} +

+
+ )} {finding.cve && (

CVE Identifiers

@@ -243,6 +266,28 @@ export default function TaskDetails() {
+ {finding.risk_factors && finding.risk_factors.length > 0 && ( +
+

Risk Factor Breakdown

+
+ {finding.risk_factors.map((rf) => ( +
+
+ {rf.label} + ({(rf.weight * 100).toFixed(0)}%) +
+
+ {rf.score.toFixed(1)} + = 2 ? 'text-rag-red' : rf.contribution >= 1 ? 'text-rag-amber' : 'text-silver/40'}`}> + +{rf.contribution.toFixed(1)} + +
+
+ ))} +
+
+ )} + {Object.keys(finding.metadata || {}).length > 0 && (

Technical Attributes

diff --git a/frontend/testing/unit/pages/Findings.test.tsx b/frontend/testing/unit/pages/Findings.test.tsx index b5c6e39a..561ae086 100644 --- a/frontend/testing/unit/pages/Findings.test.tsx +++ b/frontend/testing/unit/pages/Findings.test.tsx @@ -476,3 +476,105 @@ describe('Findings — active filter summary', () => { expect(within(strip).getByText(/to: 2026-05-15/i)).toBeInTheDocument() }) }) + +// ── Risk score display ──────────────────────────────────────────────────────── + +describe('Findings — risk score display', () => { + const riskFactors = [ + { factor: 'severity', label: 'Severity', value: 'critical', score: 10.0, weight: 0.30, contribution: 3.0, detail: 'Severity is critical (10.0/10)' }, + { factor: 'exploitability', label: 'Exploitability', value: 8.0, score: 8.0, weight: 0.25, contribution: 2.0, detail: 'Exploitability score is 8.0/10' }, + { factor: 'asset_exposure', label: 'Asset Exposure', value: 'critical', score: 10.0, weight: 0.20, contribution: 2.0, detail: 'Asset exposure is critical (10.0/10)' }, + { factor: 'recency', label: 'Recency', value: '2026-05-14T10:00:00Z', score: 10.0, weight: 0.15, contribution: 1.5, detail: 'Discovered today — maximum recency score' }, + { factor: 'confidence', label: 'Confidence', value: 0.95, score: 9.5, weight: 0.10, contribution: 0.95, detail: 'Confidence is 95%' }, + ] + + const criticalFindingWithRisk = { + ...criticalFinding, + risk_score: 8.7, + risk_factors: riskFactors, + } + + beforeEach(() => { + vi.mocked(getFindings).mockResolvedValue({ findings: [criticalFindingWithRisk, highFinding, mediumFinding] }) + }) + + it('shows risk score in sidebar when available', async () => { + renderFindings() + await waitForLoad() + + await waitFor(() => { + expect(screen.getByText('Risk Score')).toBeInTheDocument() + }) + expect(screen.getByText('8.7')).toBeInTheDocument() + }) + + it('shows risk factor breakdown with labels and contributions', async () => { + renderFindings() + await waitForLoad() + + await waitFor(() => { + expect(screen.getByText('Severity')).toBeInTheDocument() + }) + expect(screen.getByText('Exploitability')).toBeInTheDocument() + expect(screen.getByText('Asset Exposure')).toBeInTheDocument() + expect(screen.getByText('Recency')).toBeInTheDocument() + expect(screen.getByText('Confidence')).toBeInTheDocument() + }) + + it('shows weight percentages for each risk factor', async () => { + renderFindings() + await waitForLoad() + + await waitFor(() => { + expect(screen.getByText('(30%)')).toBeInTheDocument() + }) + expect(screen.getByText('(25%)')).toBeInTheDocument() + expect(screen.getByText('(20%)')).toBeInTheDocument() + expect(screen.getByText('(15%)')).toBeInTheDocument() + expect(screen.getByText('(10%)')).toBeInTheDocument() + }) + + it('shows risk score in red for high values (>= 7)', async () => { + renderFindings() + await waitForLoad() + + await waitFor(() => { + const scoreEl = screen.getByText('8.7') + expect(scoreEl.className).toContain('text-rag-red') + }) + }) + + it('shows risk score in amber for medium values (4-6.9)', async () => { + const mediumWithRisk = { ...mediumFinding, risk_score: 5.2, risk_factors: riskFactors.map(f => ({ ...f, score: 5 })) } + vi.mocked(getFindings).mockResolvedValue({ findings: [mediumWithRisk] }) + renderFindings() + + await waitFor(() => { + expect(screen.getByText('5.2')).toBeInTheDocument() + }) + const scoreEl = screen.getByText('5.2') + expect(scoreEl.className).toContain('text-rag-amber') + }) + + it('shows risk score in blue for low values (< 4)', async () => { + const lowWithRisk = { ...mediumFinding, severity: 'low', risk_score: 2.1, risk_factors: riskFactors.map(f => ({ ...f, score: 2 })) } + vi.mocked(getFindings).mockResolvedValue({ findings: [lowWithRisk] }) + renderFindings() + + await waitFor(() => { + expect(screen.getByText('2.1')).toBeInTheDocument() + }) + const scoreEl = screen.getByText('2.1') + expect(scoreEl.className).toContain('text-rag-blue') + }) + + it('does not show risk score section when finding has no risk_score', async () => { + vi.mocked(getFindings).mockResolvedValue({ findings: [highFinding, mediumFinding] }) + renderFindings() + + await waitFor(() => { + expect(screen.getByText(/Stored XSS in Comments/i)).toBeInTheDocument() + }) + expect(screen.queryByText('Risk Score')).not.toBeInTheDocument() + }) +}) diff --git a/testing/backend/unit/test_risk_scoring.py b/testing/backend/unit/test_risk_scoring.py new file mode 100644 index 00000000..7dcf28af --- /dev/null +++ b/testing/backend/unit/test_risk_scoring.py @@ -0,0 +1,166 @@ +"""Tests for the risk scoring module.""" + +from datetime import datetime, timezone, timedelta +from backend.secuscan.risk_scoring import ( + compute_risk_score, + compute_risk_factors, +) + + +class TestComputeRiskScore: + """Determinism, bounds, and edge cases for the composite score.""" + + def test_deterministic(self): + """Same inputs always produce the same score.""" + s1 = compute_risk_score("critical", exploitability=9.0, asset_exposure="critical", confidence=0.9) + s2 = compute_risk_score("critical", exploitability=9.0, asset_exposure="critical", confidence=0.9) + assert s1 == s2 + + def test_score_range(self): + """Score is always in [0, 10].""" + for sev in ("critical", "high", "medium", "low", "info"): + score = compute_risk_score(sev) + assert 0.0 <= score <= 10.0, f"Score {score} out of range for {sev}" + + def test_critical_maximises_score(self): + """All maximum inputs yield the highest score.""" + score = compute_risk_score( + "critical", + exploitability=10.0, + asset_exposure="critical", + confidence=1.0, + discovered_at=datetime.now(timezone.utc), + ) + assert score > 8.0 + + def test_info_minimises_score(self): + """All minimum inputs yield the lowest score.""" + score = compute_risk_score( + "info", + exploitability=0.0, + asset_exposure="low", + confidence=0.0, + discovered_at=datetime.now(timezone.utc) - timedelta(days=1000), + ) + assert score < 4.0 + + def test_exploitability_default(self): + """None exploitability defaults to 5.0.""" + s1 = compute_risk_score("medium", exploitability=None) + s2 = compute_risk_score("medium", exploitability=5.0) + assert s1 == s2 + + def test_confidence_default(self): + """None confidence defaults to 0.5.""" + s1 = compute_risk_score("medium", confidence=None) + s2 = compute_risk_score("medium", confidence=0.5) + assert s1 == s2 + + def test_asset_exposure_default(self): + """None asset_exposure defaults to 'medium'.""" + s1 = compute_risk_score("medium", asset_exposure=None) + s2 = compute_risk_score("medium", asset_exposure="medium") + assert s1 == s2 + + def test_recency_recent(self): + """Finding from today gets max recency contribution.""" + today = datetime.now(timezone.utc) + score = compute_risk_score("high", discovered_at=today) + recent_score = compute_risk_score("high", discovered_at=today - timedelta(days=365)) + # Today should be higher than a year ago + assert score > recent_score + + def test_recency_old(self): + """Finding from >1 year ago gets minimum recency.""" + old = datetime.now(timezone.utc) - timedelta(days=400) + score = compute_risk_score("high", discovered_at=old) + old_score = compute_risk_score("high", discovered_at=old - timedelta(days=1000)) + assert score == old_score # both floored at 1.0 + + def test_recency_none(self): + """None discovered_at defaults to moderate recency (5.0).""" + s1 = compute_risk_score("medium") + s2 = compute_risk_score("medium", discovered_at=datetime.now(timezone.utc) - timedelta(days=89)) + assert s1 == s2 + + def test_exploitability_clamping(self): + """Exploitability outside [0,10] is clamped.""" + s1 = compute_risk_score("high", exploitability=-5.0) + s2 = compute_risk_score("high", exploitability=15.0) + s3 = compute_risk_score("high", exploitability=0.0) + s4 = compute_risk_score("high", exploitability=10.0) + assert s1 == s3 + assert s2 == s4 + + +class TestComputeRiskFactors: + """Risk factor explanations.""" + + def test_returns_five_factors(self): + """Five factors returned: severity, exploitability, asset_exposure, recency, confidence.""" + factors = compute_risk_factors("critical") + assert len(factors) == 5 + keys = {f["factor"] for f in factors} + assert keys == {"severity", "exploitability", "asset_exposure", "recency", "confidence"} + + def test_contributions_sum_to_score(self): + """Sum of weighted contributions equals the risk score.""" + score = compute_risk_score("high", exploitability=7.0, asset_exposure="high", confidence=0.8) + factors = compute_risk_factors("high", exploitability=7.0, asset_exposure="high", confidence=0.8, risk_score=score) + total = sum(f["contribution"] for f in factors) + # Rounding may cause 0.01 delta + assert abs(total - score) < 0.1 + + def test_negative_exploitability(self): + """Negative exploitability is handled gracefully.""" + factors = compute_risk_factors("medium", exploitability=-1.0) + exf = [f for f in factors if f["factor"] == "exploitability"][0] + assert exf["score"] >= 0 + + def test_risk_factors_provide_detail(self): + """Each factor has a non-empty detail string.""" + factors = compute_risk_factors("critical", exploitability=9.0, asset_exposure="critical", confidence=0.95) + for f in factors: + assert f["detail"], f"Factor {f['factor']} missing detail" + + +class TestFindingModelWithRiskFields: + """The Finding pydantic model accepts the new risk fields.""" + + def test_finding_with_risk_fields(self): + from backend.secuscan.models import Finding + + finding = Finding( + title="SQL Injection", + category="injection", + severity="critical", + target="app.example.com", + description="Input not sanitized", + exploitability=8.5, + confidence=0.95, + asset_exposure="critical", + risk_score=8.7, + risk_factors=[{"factor": "severity", "score": 10.0, "weight": 0.30, "contribution": 3.0}], + ) + assert finding.exploitability == 8.5 + assert finding.confidence == 0.95 + assert finding.asset_exposure == "critical" + assert finding.risk_score == 8.7 + assert len(finding.risk_factors) == 1 + + def test_finding_without_risk_fields(self): + """Risk fields default to None/empty — backward compatible.""" + from backend.secuscan.models import Finding + + finding = Finding( + title="XSS", + category="xss", + severity="medium", + target="web.example.com", + description="XSS found", + ) + assert finding.exploitability is None + assert finding.confidence is None + assert finding.asset_exposure is None + assert finding.risk_score is None + assert finding.risk_factors == []