-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_handler.py
More file actions
123 lines (106 loc) · 5.01 KB
/
lambda_handler.py
File metadata and controls
123 lines (106 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
AWS Lambda Handler
Deploy this as a Lambda function to run scheduled or on-demand scans.
The function:
1. Scans using the execution role's IAM permissions
2. Optionally enriches findings with LLM explanations
3. Stores the signed report to S3
4. Publishes a summary notification to SNS
Required environment variables:
REPORT_BUCKET — S3 bucket to store reports
SIGNING_SECRET — HMAC secret for tamper-evident signing
ANTHROPIC_API_KEY — (optional) for LLM explanations
SNS_TOPIC_ARN — (optional) for alerting
AWS_REGION — set automatically by Lambda
Optional env vars:
SCAN_REGION — AWS region to scan (defaults to Lambda's region)
MAX_EXPLAIN — max findings to send to LLM (default: 10)
SEVERITY_FILTER — comma-separated severities, e.g. "CRITICAL,HIGH"
"""
import json
import logging
import os
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
import boto3
from scanner.aws_scanner import AWSScanner
from scanner.report import build_report, sign_report_hmac, save_report
from scanner.explainer import LLMExplainer
region = os.environ.get("SCAN_REGION", os.environ.get("AWS_REGION", "us-east-1"))
report_bucket = os.environ.get("REPORT_BUCKET")
signing_secret = os.environ.get("SIGNING_SECRET", "change-me-in-production")
sns_topic = os.environ.get("SNS_TOPIC_ARN")
max_explain = int(os.environ.get("MAX_EXPLAIN", "10"))
severity_filter = os.environ.get("SEVERITY_FILTER", "").split(",") if os.environ.get("SEVERITY_FILTER") else None
use_llm = bool(os.environ.get("ANTHROPIC_API_KEY"))
# ── Scan ──────────────────────────────────────────────────
logger.info("Starting scan in region: %s", region)
scanner = AWSScanner(region=region)
findings = scanner.run_all()
logger.info("Raw findings: %d", len(findings))
if severity_filter:
findings = [f for f in findings if f.get("severity") in severity_filter]
logger.info("After filter: %d", len(findings))
# ── LLM Enrichment ────────────────────────────────────────
if use_llm and findings:
try:
explainer = LLMExplainer()
findings = explainer.explain_all(findings, max_findings=max_explain)
logger.info("LLM enrichment complete")
except Exception as e:
logger.warning("LLM enrichment failed: %s", e)
# ── Build & sign report ───────────────────────────────────
payload = build_report(
findings=findings,
metadata={
"trigger": event.get("source", "manual"),
"lambda_function": context.function_name if context else "local",
"scan_region": region,
},
)
signed_report = sign_report_hmac(payload, signing_secret)
# ── Save to S3 ────────────────────────────────────────────
report_key = None
if report_bucket:
ts = datetime.now(timezone.utc).strftime("%Y/%m/%d/%H%M%S")
report_key = f"cloudsec-reports/{ts}-report.json"
s3 = boto3.client("s3")
s3.put_object(
Bucket=report_bucket,
Key=report_key,
Body=json.dumps(signed_report, indent=2, default=str),
ContentType="application/json",
ServerSideEncryption="AES256",
)
logger.info("Report saved to s3://%s/%s", report_bucket, report_key)
else:
logger.warning("REPORT_BUCKET not set — report not persisted")
# ── SNS Alert ─────────────────────────────────────────────
summary = payload.get("summary", {})
critical = summary.get("severity_counts", {}).get("CRITICAL", 0)
high = summary.get("severity_counts", {}).get("HIGH", 0)
if sns_topic and (critical > 0 or high > 0):
sns = boto3.client("sns")
message = (
f"⚠️ Cloud Security Scan Alert\n\n"
f"Critical: {critical} | High: {high}\n"
f"Total findings: {summary.get('total_findings', 0)}\n"
)
if report_key:
message += f"Report: s3://{report_bucket}/{report_key}\n"
sns.publish(
TopicArn=sns_topic,
Subject=f"[cloudsec] {critical} CRITICAL findings detected",
Message=message,
)
logger.info("SNS alert sent")
return {
"statusCode": 200,
"body": json.dumps({
"total_findings": summary.get("total_findings", 0),
"severity_counts": summary.get("severity_counts", {}),
"report_location": f"s3://{report_bucket}/{report_key}" if report_key else None,
}),
}