-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexplainer.py
More file actions
174 lines (147 loc) · 6.9 KB
/
explainer.py
File metadata and controls
174 lines (147 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
LLM Explainer
Uses Claude (via Anthropic API) to translate raw finding dicts into
plain-English explanations and actionable remediation steps.
"""
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
SEVERITY_EMOJI = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "🔵", "INFO": "⚪"}
SYSTEM_PROMPT = """You are a cloud security expert assistant. Your role is to explain
cloud infrastructure misconfigurations in plain English to developers and engineers
who may not be security specialists.
For each finding you receive, you will:
1. Explain what the misconfiguration is in plain English (1-2 sentences).
2. Explain the real-world risk — what could an attacker actually do? Be specific but not alarmist.
3. Provide a concise, step-by-step remediation guide with code examples where applicable
(AWS CLI, Terraform, Azure CLI, or console instructions as appropriate).
4. Rate the effort to fix: Low / Medium / High.
Respond ONLY with valid JSON in this exact format:
{
"explanation": "...",
"risk": "...",
"remediation": {
"summary": "...",
"steps": ["step 1", "step 2", "..."],
"example_code": "optional CLI/Terraform snippet or null"
},
"fix_effort": "Low | Medium | High"
}"""
class LLMExplainer:
def __init__(self, api_key: str | None = None, model: str = "claude-sonnet-4-20250514"):
self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "")
self.model = model
self._client = None
def _get_client(self):
if self._client:
return self._client
try:
import anthropic
self._client = anthropic.Anthropic(api_key=self.api_key)
return self._client
except ImportError:
raise RuntimeError("anthropic package required. Run: pip install anthropic")
def explain_finding(self, finding: dict[str, Any]) -> dict[str, Any]:
"""
Add plain-English explanation and remediation to a single finding.
Returns the finding dict augmented with an 'llm_analysis' key.
"""
if not self.api_key:
logger.warning("ANTHROPIC_API_KEY not set — skipping LLM analysis.")
return {**finding, "llm_analysis": None}
prompt = (
f"Analyze this cloud security finding and provide your response:\n\n"
f"Resource Type: {finding.get('resource_type')}\n"
f"Resource ID: {finding.get('resource_id')}\n"
f"Check Name: {finding.get('check')}\n"
f"Severity: {finding.get('severity')}\n"
f"Detail: {finding.get('detail')}\n"
)
try:
client = self._get_client()
message = client.messages.create(
model=self.model,
max_tokens=1024,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": prompt}],
)
import json
text = message.content[0].text.strip()
# Strip markdown code fences if present
if text.startswith("```"):
text = "\n".join(text.split("\n")[1:])
if text.endswith("```"):
text = "\n".join(text.split("\n")[:-1])
analysis = json.loads(text)
return {**finding, "llm_analysis": analysis}
except Exception as e:
logger.error("LLM explanation failed for %s/%s: %s", finding.get("resource_id"), finding.get("check"), e)
return {**finding, "llm_analysis": {"error": str(e)}}
def explain_all(
self,
findings: list[dict[str, Any]],
max_findings: int = 50,
) -> list[dict[str, Any]]:
"""
Enrich all findings with LLM analysis.
Caps at max_findings to avoid runaway API costs.
Deduplicated by (resource_type, check) so identical issues share one explanation.
"""
if not findings:
return findings
if len(findings) > max_findings:
logger.warning(
"Capping LLM analysis at %d findings (total: %d). "
"Increase max_findings or pre-filter by severity.",
max_findings, len(findings),
)
# Prioritize by severity
sev_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
findings = sorted(findings, key=lambda f: sev_order.get(f.get("severity", "INFO"), 5))[:max_findings]
# Cache explanations for identical check types to save API calls
cache: dict[tuple, dict] = {}
enriched = []
for finding in findings:
cache_key = (finding.get("resource_type"), finding.get("check"))
if cache_key in cache:
enriched.append({**finding, "llm_analysis": cache[cache_key]})
else:
result = self.explain_finding(finding)
analysis = result.get("llm_analysis")
if analysis and "error" not in analysis:
cache[cache_key] = analysis
enriched.append(result)
return enriched
def format_terminal_report(self, findings: list[dict[str, Any]]) -> str:
"""
Render a human-readable terminal report of enriched findings.
"""
if not findings:
return "✅ No findings — your configuration looks clean.\n"
lines = ["\n" + "═" * 70, " CLOUD SECURITY SCAN REPORT", "═" * 70 + "\n"]
sev_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
sorted_findings = sorted(findings, key=lambda f: sev_order.get(f.get("severity", "INFO"), 5))
for i, f in enumerate(sorted_findings, 1):
sev = f.get("severity", "INFO")
emoji = SEVERITY_EMOJI.get(sev, "⚪")
lines.append(f"Finding #{i}: {emoji} [{sev}] {f.get('check')}")
lines.append(f" Resource: {f.get('resource_type')} / {f.get('resource_id')}")
lines.append(f" Detail: {f.get('detail')}")
analysis = f.get("llm_analysis")
if analysis and "error" not in analysis:
lines.append(f"\n 📖 Explanation:")
lines.append(f" {analysis.get('explanation', 'N/A')}")
lines.append(f"\n ⚠️ Risk:")
lines.append(f" {analysis.get('risk', 'N/A')}")
rem = analysis.get("remediation", {})
lines.append(f"\n 🔧 Remediation ({analysis.get('fix_effort', '?')} effort):")
lines.append(f" {rem.get('summary', '')}")
for step in rem.get("steps", []):
lines.append(f" • {step}")
if rem.get("example_code"):
lines.append(f"\n Example:\n {rem['example_code']}")
elif analysis and "error" in analysis:
lines.append(f" ⚠️ LLM analysis failed: {analysis['error']}")
lines.append("\n" + "─" * 70 + "\n")
return "\n".join(lines)